Coverage for src/tests/test_auth.py: 100%

588 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import http.cookies 

2 

3import grpc 

4import pytest 

5from google.protobuf import empty_pb2, wrappers_pb2 

6from sqlalchemy.sql import delete, func 

7 

8from couchers import errors, urls 

9from couchers.crypto import hash_password, random_hex 

10from couchers.db import session_scope 

11from couchers.models import ( 

12 ContributeOption, 

13 ContributorForm, 

14 InviteCode, 

15 LoginToken, 

16 PasswordResetToken, 

17 SignupFlow, 

18 Upload, 

19 User, 

20 UserSession, 

21) 

22from couchers.sql import couchers_select as select 

23from proto import api_pb2, auth_pb2 

24from tests.test_fixtures import ( # noqa 

25 api_session, 

26 auth_api_session, 

27 db, 

28 email_fields, 

29 fast_passwords, 

30 generate_user, 

31 mock_notification_email, 

32 push_collector, 

33 real_api_session, 

34 testconfig, 

35) 

36 

37 

38@pytest.fixture(autouse=True) 

39def _(testconfig, fast_passwords): 

40 pass 

41 

42 

43def get_session_cookie_tokens(metadata_interceptor): 

44 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

45 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

46 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

47 return sesh, uid 

48 

49 

50def test_UsernameValid(db): 

51 with auth_api_session() as (auth_api, metadata_interceptor): 

52 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

53 

54 with auth_api_session() as (auth_api, metadata_interceptor): 

55 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

56 

57 

58def test_signup_incremental(db): 

59 with auth_api_session() as (auth_api, metadata_interceptor): 

60 res = auth_api.SignupFlow( 

61 auth_pb2.SignupFlowReq( 

62 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

63 ) 

64 ) 

65 

66 flow_token = res.flow_token 

67 assert res.flow_token 

68 assert not res.HasField("auth_res") 

69 assert not res.need_basic 

70 assert res.need_account 

71 assert not res.need_feedback 

72 assert res.need_verify_email 

73 assert res.need_accept_community_guidelines 

74 

75 # read out the signup token directly from the database for now 

76 with session_scope() as session: 

77 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

78 assert flow.email_sent 

79 assert not flow.email_verified 

80 email_token = flow.email_token 

81 

82 with auth_api_session() as (auth_api, metadata_interceptor): 

83 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

84 

85 assert res.flow_token == flow_token 

86 assert not res.HasField("auth_res") 

87 assert not res.need_basic 

88 assert res.need_account 

89 assert not res.need_feedback 

90 assert res.need_verify_email 

91 assert res.need_accept_community_guidelines 

92 

93 # Add feedback 

94 with auth_api_session() as (auth_api, metadata_interceptor): 

95 res = auth_api.SignupFlow( 

96 auth_pb2.SignupFlowReq( 

97 flow_token=flow_token, 

98 feedback=auth_pb2.ContributorForm( 

99 ideas="I'm a robot, incapable of original ideation", 

100 features="I love all your features", 

101 experience="I haven't done couch surfing before", 

102 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

103 contribute_ways=["serving", "backend"], 

104 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

105 ), 

106 ) 

107 ) 

108 

109 assert res.flow_token == flow_token 

110 assert not res.HasField("auth_res") 

111 assert not res.need_basic 

112 assert res.need_account 

113 assert not res.need_feedback 

114 assert res.need_verify_email 

115 assert res.need_accept_community_guidelines 

116 

117 # Agree to community guidelines 

118 with auth_api_session() as (auth_api, metadata_interceptor): 

119 res = auth_api.SignupFlow( 

120 auth_pb2.SignupFlowReq( 

121 flow_token=flow_token, 

122 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

123 ) 

124 ) 

125 

126 assert res.flow_token == flow_token 

127 assert not res.HasField("auth_res") 

128 assert not res.need_basic 

129 assert res.need_account 

130 assert not res.need_feedback 

131 assert res.need_verify_email 

132 assert not res.need_accept_community_guidelines 

133 

134 # Verify email 

135 with auth_api_session() as (auth_api, metadata_interceptor): 

136 res = auth_api.SignupFlow( 

137 auth_pb2.SignupFlowReq( 

138 flow_token=flow_token, 

139 email_token=email_token, 

140 ) 

141 ) 

142 

143 assert res.flow_token == flow_token 

144 assert not res.HasField("auth_res") 

145 assert not res.need_basic 

146 assert res.need_account 

147 assert not res.need_feedback 

148 assert not res.need_verify_email 

149 assert not res.need_accept_community_guidelines 

150 

151 # Finally finish off account info 

152 with auth_api_session() as (auth_api, metadata_interceptor): 

153 res = auth_api.SignupFlow( 

154 auth_pb2.SignupFlowReq( 

155 flow_token=flow_token, 

156 account=auth_pb2.SignupAccount( 

157 username="frodo", 

158 password="a very insecure password", 

159 birthdate="1970-01-01", 

160 gender="Bot", 

161 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

162 city="New York City", 

163 lat=40.7331, 

164 lng=-73.9778, 

165 radius=500, 

166 accept_tos=True, 

167 ), 

168 ) 

169 ) 

170 

171 assert not res.flow_token 

172 assert res.HasField("auth_res") 

173 assert res.auth_res.user_id 

174 assert not res.auth_res.jailed 

175 assert not res.need_basic 

176 assert not res.need_account 

177 assert not res.need_feedback 

178 assert not res.need_verify_email 

179 assert not res.need_accept_community_guidelines 

180 

181 user_id = res.auth_res.user_id 

182 

183 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

184 assert uid == str(user_id) 

185 

186 with api_session(sess_token) as api: 

187 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

188 

189 assert res.username == "frodo" 

190 assert res.gender == "Bot" 

191 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

192 assert res.city == "New York City" 

193 assert res.lat == 40.7331 

194 assert res.lng == -73.9778 

195 assert res.radius == 500 

196 

197 with session_scope() as session: 

198 form = session.execute(select(ContributorForm)).scalar_one() 

199 

200 assert form.ideas == "I'm a robot, incapable of original ideation" 

201 assert form.features == "I love all your features" 

202 assert form.experience == "I haven't done couch surfing before" 

203 assert form.contribute == ContributeOption.yes 

204 assert form.contribute_ways == ["serving", "backend"] 

205 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

206 

207 

208def _quick_signup(): 

209 with auth_api_session() as (auth_api, metadata_interceptor): 

210 res = auth_api.SignupFlow( 

211 auth_pb2.SignupFlowReq( 

212 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

213 account=auth_pb2.SignupAccount( 

214 username="frodo", 

215 password="a very insecure password", 

216 birthdate="1970-01-01", 

217 gender="Bot", 

218 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

219 city="New York City", 

220 lat=40.7331, 

221 lng=-73.9778, 

222 radius=500, 

223 accept_tos=True, 

224 ), 

225 feedback=auth_pb2.ContributorForm(), 

226 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

227 ) 

228 ) 

229 

230 flow_token = res.flow_token 

231 

232 assert res.flow_token 

233 assert not res.HasField("auth_res") 

234 assert not res.need_basic 

235 assert not res.need_account 

236 assert not res.need_feedback 

237 assert res.need_verify_email 

238 

239 # read out the signup token directly from the database for now 

240 with session_scope() as session: 

241 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

242 assert flow.email_sent 

243 assert not flow.email_verified 

244 email_token = flow.email_token 

245 

246 with auth_api_session() as (auth_api, metadata_interceptor): 

247 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

248 

249 assert not res.flow_token 

250 assert res.HasField("auth_res") 

251 assert res.auth_res.user_id 

252 assert not res.auth_res.jailed 

253 assert not res.need_basic 

254 assert not res.need_account 

255 assert not res.need_feedback 

256 assert not res.need_verify_email 

257 

258 # make sure we got the right token in a cookie 

259 with session_scope() as session: 

260 token = ( 

261 session.execute( 

262 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

263 ).scalar_one() 

264 ).token 

265 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

266 assert sesh == token 

267 

268 

269def test_signup(db): 

270 _quick_signup() 

271 

272 

273def test_basic_login(db): 

274 # Create our test user using signup 

275 _quick_signup() 

276 

277 with auth_api_session() as (auth_api, metadata_interceptor): 

278 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

279 

280 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

281 

282 with session_scope() as session: 

283 token = ( 

284 session.execute( 

285 select(UserSession) 

286 .join(User, UserSession.user_id == User.id) 

287 .where(User.username == "frodo") 

288 .where(UserSession.token == reply_token) 

289 .where(UserSession.is_valid) 

290 ).scalar_one_or_none() 

291 ).token 

292 assert token 

293 

294 # log out 

295 with auth_api_session() as (auth_api, metadata_interceptor): 

296 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

297 

298 

299def test_login_part_signed_up_verified_email(db): 

300 """ 

301 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

302 """ 

303 with auth_api_session() as (auth_api, metadata_interceptor): 

304 res = auth_api.SignupFlow( 

305 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

306 ) 

307 

308 flow_token = res.flow_token 

309 assert res.need_verify_email 

310 

311 # verify the email 

312 with session_scope() as session: 

313 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

314 flow_token = flow.flow_token 

315 email_token = flow.email_token 

316 with auth_api_session() as (auth_api, metadata_interceptor): 

317 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

318 

319 with mock_notification_email() as mock: 

320 with auth_api_session() as (auth_api, metadata_interceptor): 

321 with pytest.raises(grpc.RpcError) as e: 

322 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

323 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

324 

325 assert mock.call_count == 1 

326 e = email_fields(mock) 

327 assert e.recipient == "email@couchers.org.invalid" 

328 assert flow_token in e.plain 

329 assert flow_token in e.html 

330 

331 

332def test_login_part_signed_up_not_verified_email(db): 

333 with auth_api_session() as (auth_api, metadata_interceptor): 

334 res = auth_api.SignupFlow( 

335 auth_pb2.SignupFlowReq( 

336 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

337 account=auth_pb2.SignupAccount( 

338 username="frodo", 

339 password="a very insecure password", 

340 birthdate="1999-01-01", 

341 gender="Bot", 

342 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

343 city="New York City", 

344 lat=40.7331, 

345 lng=-73.9778, 

346 radius=500, 

347 accept_tos=True, 

348 ), 

349 ) 

350 ) 

351 

352 flow_token = res.flow_token 

353 assert res.need_verify_email 

354 

355 with mock_notification_email() as mock: 

356 with auth_api_session() as (auth_api, metadata_interceptor): 

357 with pytest.raises(grpc.RpcError) as e: 

358 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

359 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

360 

361 with session_scope() as session: 

362 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

363 email_token = flow.email_token 

364 

365 assert mock.call_count == 1 

366 e = email_fields(mock) 

367 assert e.recipient == "email@couchers.org.invalid" 

368 assert email_token in e.plain 

369 assert email_token in e.html 

370 

371 

372def test_banned_user(db): 

373 _quick_signup() 

374 

375 with session_scope() as session: 

376 session.execute(select(User)).scalar_one().is_banned = True 

377 

378 with auth_api_session() as (auth_api, metadata_interceptor): 

379 with pytest.raises(grpc.RpcError) as e: 

380 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

381 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

382 assert e.value.details() == errors.ACCOUNT_SUSPENDED 

383 

384 

385def test_deleted_user(db): 

386 _quick_signup() 

387 

388 with session_scope() as session: 

389 session.execute(select(User)).scalar_one().is_deleted = True 

390 

391 with auth_api_session() as (auth_api, metadata_interceptor): 

392 with pytest.raises(grpc.RpcError) as e: 

393 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

394 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

395 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

396 

397 

398def test_invalid_token(db): 

399 user1, token1 = generate_user() 

400 user2, token2 = generate_user() 

401 

402 wrong_token = random_hex(32) 

403 

404 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

405 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

406 

407 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

408 assert e.value.details() == "Unauthorized" 

409 

410 

411def test_password_reset_v2(db, push_collector): 

412 user, token = generate_user(hashed_password=hash_password("mypassword")) 

413 

414 with auth_api_session() as (auth_api, metadata_interceptor): 

415 with mock_notification_email() as mock: 

416 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

417 

418 with session_scope() as session: 

419 password_reset_token = session.execute(select(PasswordResetToken)).scalar_one().token 

420 

421 assert mock.call_count == 1 

422 e = email_fields(mock) 

423 assert e.recipient == user.email 

424 assert "reset" in e.subject.lower() 

425 assert password_reset_token in e.plain 

426 assert password_reset_token in e.html 

427 unique_string = "You asked for your password to be reset on Couchers.org." 

428 assert unique_string in e.plain 

429 assert unique_string in e.html 

430 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

431 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

432 assert "support@couchers.org" in e.plain 

433 assert "support@couchers.org" in e.html 

434 

435 push_collector.assert_user_push_matches_fields( 

436 user.id, 

437 title="A password reset was initiated on your account", 

438 body="Someone initiated a password change on your account.", 

439 ) 

440 

441 # make sure bad password are caught 

442 with auth_api_session() as (auth_api, metadata_interceptor): 

443 with pytest.raises(grpc.RpcError) as e: 

444 auth_api.CompletePasswordResetV2( 

445 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

446 ) 

447 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

448 assert e.value.details() == errors.INSECURE_PASSWORD 

449 

450 # make sure we can set a good password 

451 with auth_api_session() as (auth_api, metadata_interceptor): 

452 pwd = random_hex() 

453 with mock_notification_email() as mock: 

454 res = auth_api.CompletePasswordResetV2( 

455 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

456 ) 

457 

458 push_collector.assert_user_push_matches_fields( 

459 user.id, 

460 ix=1, 

461 title="Your password was successfully reset", 

462 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.", 

463 ) 

464 

465 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

466 

467 with session_scope() as session: 

468 other_session_token = ( 

469 session.execute( 

470 select(UserSession) 

471 .join(User, UserSession.user_id == User.id) 

472 .where(User.username == user.username) 

473 .where(UserSession.token == session_token) 

474 .where(UserSession.is_valid) 

475 ).scalar_one_or_none() 

476 ).token 

477 assert other_session_token 

478 

479 # make sure we can't set a password again 

480 with auth_api_session() as (auth_api, metadata_interceptor): 

481 with pytest.raises(grpc.RpcError) as e: 

482 auth_api.CompletePasswordResetV2( 

483 auth_pb2.CompletePasswordResetV2Req( 

484 password_reset_token=password_reset_token, new_password=random_hex() 

485 ) 

486 ) 

487 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

488 assert e.value.details() == errors.INVALID_TOKEN 

489 

490 with session_scope() as session: 

491 user = session.execute(select(User)).scalar_one() 

492 assert user.hashed_password == hash_password(pwd) 

493 

494 

495def test_password_reset_no_such_user(db): 

496 user, token = generate_user() 

497 

498 with auth_api_session() as (auth_api, metadata_interceptor): 

499 res = auth_api.ResetPassword( 

500 auth_pb2.ResetPasswordReq( 

501 user="nonexistentuser", 

502 ) 

503 ) 

504 

505 with session_scope() as session: 

506 res = session.execute(select(PasswordResetToken)).scalar_one_or_none() 

507 

508 assert res is None 

509 

510 

511def test_password_reset_invalid_token_v2(db): 

512 password = random_hex() 

513 user, token = generate_user(hashed_password=hash_password(password)) 

514 

515 with auth_api_session() as (auth_api, metadata_interceptor): 

516 res = auth_api.ResetPassword( 

517 auth_pb2.ResetPasswordReq( 

518 user=user.username, 

519 ) 

520 ) 

521 

522 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

523 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

524 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

525 assert e.value.details() == errors.INVALID_TOKEN 

526 

527 with session_scope() as session: 

528 user = session.execute(select(User)).scalar_one() 

529 assert user.hashed_password == hash_password(password) 

530 

531 

532def test_logout_invalid_token(db): 

533 # Create our test user using signup 

534 _quick_signup() 

535 

536 with auth_api_session() as (auth_api, metadata_interceptor): 

537 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

538 

539 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

540 

541 # delete all login tokens 

542 with session_scope() as session: 

543 session.execute(delete(LoginToken)) 

544 

545 # log out with non-existent token should still return a valid result 

546 with auth_api_session() as (auth_api, metadata_interceptor): 

547 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

548 

549 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

550 # make sure we set an empty cookie 

551 assert reply_token == "" 

552 

553 

554def test_signup_without_password(db): 

555 with auth_api_session() as (auth_api, metadata_interceptor): 

556 with pytest.raises(grpc.RpcError) as e: 

557 auth_api.SignupFlow( 

558 auth_pb2.SignupFlowReq( 

559 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

560 account=auth_pb2.SignupAccount( 

561 username="frodo", 

562 password="bad", 

563 city="Minas Tirith", 

564 birthdate="9999-12-31", # arbitrary future birthdate 

565 gender="Robot", 

566 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

567 lat=1, 

568 lng=1, 

569 radius=100, 

570 accept_tos=True, 

571 ), 

572 feedback=auth_pb2.ContributorForm(), 

573 ) 

574 ) 

575 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

576 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

577 

578 

579def test_signup_invalid_birthdate(db): 

580 with auth_api_session() as (auth_api, metadata_interceptor): 

581 with pytest.raises(grpc.RpcError) as e: 

582 auth_api.SignupFlow( 

583 auth_pb2.SignupFlowReq( 

584 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

585 account=auth_pb2.SignupAccount( 

586 username="frodo", 

587 password="a very insecure password", 

588 city="Minas Tirith", 

589 birthdate="9999-12-31", # arbitrary future birthdate 

590 gender="Robot", 

591 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

592 lat=1, 

593 lng=1, 

594 radius=100, 

595 accept_tos=True, 

596 ), 

597 feedback=auth_pb2.ContributorForm(), 

598 ) 

599 ) 

600 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

601 assert e.value.details() == errors.INVALID_BIRTHDATE 

602 

603 res = auth_api.SignupFlow( 

604 auth_pb2.SignupFlowReq( 

605 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

606 account=auth_pb2.SignupAccount( 

607 username="ceelo", 

608 password="a very insecure password", 

609 city="New York City", 

610 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

611 gender="Helicopter", 

612 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

613 lat=1, 

614 lng=1, 

615 radius=100, 

616 accept_tos=True, 

617 ), 

618 feedback=auth_pb2.ContributorForm(), 

619 ) 

620 ) 

621 

622 assert res.flow_token 

623 

624 with pytest.raises(grpc.RpcError) as e: 

625 auth_api.SignupFlow( 

626 auth_pb2.SignupFlowReq( 

627 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

628 account=auth_pb2.SignupAccount( 

629 username="franklin", 

630 password="a very insecure password", 

631 city="Los Santos", 

632 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

633 gender="Male", 

634 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

635 lat=1, 

636 lng=1, 

637 radius=100, 

638 accept_tos=True, 

639 ), 

640 feedback=auth_pb2.ContributorForm(), 

641 ) 

642 ) 

643 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

644 assert e.value.details() == errors.INVALID_BIRTHDATE 

645 

646 with session_scope() as session: 

647 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

648 

649 

650def test_signup_invalid_email(db): 

651 with auth_api_session() as (auth_api, metadata_interceptor): 

652 with pytest.raises(grpc.RpcError) as e: 

653 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

654 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

655 assert e.value.details() == errors.INVALID_EMAIL 

656 

657 with auth_api_session() as (auth_api, metadata_interceptor): 

658 with pytest.raises(grpc.RpcError) as e: 

659 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

660 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

661 assert e.value.details() == errors.INVALID_EMAIL 

662 

663 with auth_api_session() as (auth_api, metadata_interceptor): 

664 with pytest.raises(grpc.RpcError) as e: 

665 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

666 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

667 assert e.value.details() == errors.INVALID_EMAIL 

668 

669 with auth_api_session() as (auth_api, metadata_interceptor): 

670 with pytest.raises(grpc.RpcError) as e: 

671 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

672 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

673 assert e.value.details() == errors.INVALID_EMAIL 

674 

675 

676def test_signup_existing_email(db): 

677 # Signed up user 

678 user, _ = generate_user() 

679 

680 with auth_api_session() as (auth_api, metadata_interceptor): 

681 with pytest.raises(grpc.RpcError) as e: 

682 reply = auth_api.SignupFlow( 

683 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)) 

684 ) 

685 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

686 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN 

687 

688 

689def test_signup_continue_with_email(db): 

690 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

691 with auth_api_session() as (auth_api, metadata_interceptor): 

692 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

693 flow_token = res.flow_token 

694 assert flow_token 

695 

696 # continue with same email, should just send another email to the user 

697 with auth_api_session() as (auth_api, metadata_interceptor): 

698 with pytest.raises(grpc.RpcError) as e: 

699 res = auth_api.SignupFlow( 

700 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

701 ) 

702 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

703 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

704 

705 

706def test_signup_resend_email(db): 

707 with auth_api_session() as (auth_api, metadata_interceptor): 

708 with mock_notification_email() as mock: 

709 res = auth_api.SignupFlow( 

710 auth_pb2.SignupFlowReq( 

711 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

712 account=auth_pb2.SignupAccount( 

713 username="frodo", 

714 password="a very insecure password", 

715 birthdate="1970-01-01", 

716 gender="Bot", 

717 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

718 city="New York City", 

719 lat=40.7331, 

720 lng=-73.9778, 

721 radius=500, 

722 accept_tos=True, 

723 ), 

724 feedback=auth_pb2.ContributorForm(), 

725 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

726 ) 

727 ) 

728 assert mock.call_count == 1 

729 e = email_fields(mock) 

730 assert e.recipient == "email@couchers.org.invalid" 

731 

732 flow_token = res.flow_token 

733 assert flow_token 

734 

735 with session_scope() as session: 

736 flow = session.execute(select(SignupFlow)).scalar_one() 

737 assert flow.flow_token == flow_token 

738 assert flow.email_sent 

739 assert not flow.email_verified 

740 email_token = flow.email_token 

741 

742 # ask for a new signup email 

743 with auth_api_session() as (auth_api, metadata_interceptor): 

744 with mock_notification_email() as mock: 

745 res = auth_api.SignupFlow( 

746 auth_pb2.SignupFlowReq( 

747 flow_token=flow_token, 

748 resend_verification_email=True, 

749 ) 

750 ) 

751 assert mock.call_count == 1 

752 e = email_fields(mock) 

753 assert e.recipient == "email@couchers.org.invalid" 

754 assert email_token in e.plain 

755 assert email_token in e.html 

756 

757 with session_scope() as session: 

758 flow = session.execute(select(SignupFlow)).scalar_one() 

759 assert not flow.email_verified 

760 

761 with auth_api_session() as (auth_api, metadata_interceptor): 

762 res = auth_api.SignupFlow( 

763 auth_pb2.SignupFlowReq( 

764 email_token=email_token, 

765 ) 

766 ) 

767 

768 assert not res.flow_token 

769 assert res.HasField("auth_res") 

770 

771 

772def test_successful_authenticate(db): 

773 user, _ = generate_user(hashed_password=hash_password("password")) 

774 

775 # Authenticate with username 

776 with auth_api_session() as (auth_api, metadata_interceptor): 

777 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

778 assert not reply.jailed 

779 

780 # Authenticate with email 

781 with auth_api_session() as (auth_api, metadata_interceptor): 

782 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

783 assert not reply.jailed 

784 

785 

786def test_unsuccessful_authenticate(db): 

787 user, _ = generate_user(hashed_password=hash_password("password")) 

788 

789 # Invalid password 

790 with auth_api_session() as (auth_api, metadata_interceptor): 

791 with pytest.raises(grpc.RpcError) as e: 

792 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

793 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

794 assert e.value.details() == errors.INVALID_PASSWORD 

795 

796 # Invalid username 

797 with auth_api_session() as (auth_api, metadata_interceptor): 

798 with pytest.raises(grpc.RpcError) as e: 

799 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

800 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

801 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

802 

803 # Invalid email 

804 with auth_api_session() as (auth_api, metadata_interceptor): 

805 with pytest.raises(grpc.RpcError) as e: 

806 reply = auth_api.Authenticate( 

807 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

808 ) 

809 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

810 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

811 

812 # Invalid id 

813 with auth_api_session() as (auth_api, metadata_interceptor): 

814 with pytest.raises(grpc.RpcError) as e: 

815 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

816 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

817 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

818 

819 

820def test_complete_signup(db): 

821 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

822 with auth_api_session() as (auth_api, metadata_interceptor): 

823 reply = auth_api.SignupFlow( 

824 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

825 ) 

826 

827 flow_token = reply.flow_token 

828 

829 with auth_api_session() as (auth_api, metadata_interceptor): 

830 # Invalid username 

831 with pytest.raises(grpc.RpcError) as e: 

832 reply = auth_api.SignupFlow( 

833 auth_pb2.SignupFlowReq( 

834 flow_token=flow_token, 

835 account=auth_pb2.SignupAccount( 

836 username=" ", 

837 password="a very insecure password", 

838 city="Minas Tirith", 

839 birthdate="1980-12-31", 

840 gender="Robot", 

841 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

842 lat=1, 

843 lng=1, 

844 radius=100, 

845 accept_tos=True, 

846 ), 

847 ) 

848 ) 

849 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

850 assert e.value.details() == errors.INVALID_USERNAME 

851 

852 with auth_api_session() as (auth_api, metadata_interceptor): 

853 # Invalid name 

854 with pytest.raises(grpc.RpcError) as e: 

855 reply = auth_api.SignupFlow( 

856 auth_pb2.SignupFlowReq( 

857 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

858 ) 

859 ) 

860 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

861 assert e.value.details() == errors.INVALID_NAME 

862 

863 with auth_api_session() as (auth_api, metadata_interceptor): 

864 # Hosting status required 

865 with pytest.raises(grpc.RpcError) as e: 

866 reply = auth_api.SignupFlow( 

867 auth_pb2.SignupFlowReq( 

868 flow_token=flow_token, 

869 account=auth_pb2.SignupAccount( 

870 username="frodo", 

871 password="a very insecure password", 

872 city="Minas Tirith", 

873 birthdate="1980-12-31", 

874 gender="Robot", 

875 hosting_status=None, 

876 lat=1, 

877 lng=1, 

878 radius=100, 

879 accept_tos=True, 

880 ), 

881 ) 

882 ) 

883 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

884 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED 

885 

886 user, _ = generate_user() 

887 with auth_api_session() as (auth_api, metadata_interceptor): 

888 # Username unavailable 

889 with pytest.raises(grpc.RpcError) as e: 

890 reply = auth_api.SignupFlow( 

891 auth_pb2.SignupFlowReq( 

892 flow_token=flow_token, 

893 account=auth_pb2.SignupAccount( 

894 username=user.username, 

895 password="a very insecure password", 

896 city="Minas Tirith", 

897 birthdate="1980-12-31", 

898 gender="Robot", 

899 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

900 lat=1, 

901 lng=1, 

902 radius=100, 

903 accept_tos=True, 

904 ), 

905 ) 

906 ) 

907 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

908 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE 

909 

910 with auth_api_session() as (auth_api, metadata_interceptor): 

911 # Invalid coordinate 

912 with pytest.raises(grpc.RpcError) as e: 

913 reply = auth_api.SignupFlow( 

914 auth_pb2.SignupFlowReq( 

915 flow_token=flow_token, 

916 account=auth_pb2.SignupAccount( 

917 username="frodo", 

918 password="a very insecure password", 

919 city="Minas Tirith", 

920 birthdate="1980-12-31", 

921 gender="Robot", 

922 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

923 lat=0, 

924 lng=0, 

925 radius=100, 

926 accept_tos=True, 

927 ), 

928 ) 

929 ) 

930 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

931 assert e.value.details() == errors.INVALID_COORDINATE 

932 

933 

934def test_signup_token_regression(db): 

935 # Repro steps: 

936 # 1. Start a signup 

937 # 2. Confirm the email 

938 # 3. Start a new signup with the same email 

939 # Expected: send a link to the email to continue signing up. 

940 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

941 

942 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

943 

944 # 1. Start a signup 

945 with auth_api_session() as (auth_api, metadata_interceptor): 

946 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

947 flow_token = res.flow_token 

948 assert flow_token 

949 

950 # 2. Confirm the email 

951 with session_scope() as session: 

952 email_token = ( 

953 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token 

954 ) 

955 

956 with auth_api_session() as (auth_api, metadata_interceptor): 

957 res = auth_api.SignupFlow( 

958 auth_pb2.SignupFlowReq( 

959 flow_token=flow_token, 

960 email_token=email_token, 

961 ) 

962 ) 

963 

964 # 3. Start a new signup with the same email 

965 with auth_api_session() as (auth_api, metadata_interceptor): 

966 with pytest.raises(grpc.RpcError) as e: 

967 res = auth_api.SignupFlow( 

968 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

969 ) 

970 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

971 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

972 

973 

974@pytest.mark.parametrize("opt_out", [True, False]) 

975def test_opt_out_of_newsletter(db, opt_out): 

976 with auth_api_session() as (auth_api, metadata_interceptor): 

977 res = auth_api.SignupFlow( 

978 auth_pb2.SignupFlowReq( 

979 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

980 account=auth_pb2.SignupAccount( 

981 username="frodo", 

982 password="a very insecure password", 

983 birthdate="1970-01-01", 

984 gender="Bot", 

985 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

986 city="New York City", 

987 lat=40.7331, 

988 lng=-73.9778, 

989 radius=500, 

990 accept_tos=True, 

991 opt_out_of_newsletter=opt_out, 

992 ), 

993 feedback=auth_pb2.ContributorForm(), 

994 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

995 ) 

996 ) 

997 

998 with session_scope() as session: 

999 email_token = ( 

1000 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token 

1001 ) 

1002 

1003 with auth_api_session() as (auth_api, metadata_interceptor): 

1004 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1005 

1006 user_id = res.auth_res.user_id 

1007 

1008 with session_scope() as session: 

1009 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1010 assert not user.in_sync_with_newsletter 

1011 assert user.opt_out_of_newsletter == opt_out 

1012 

1013 

1014def test_GetAuthState(db): 

1015 user, token = generate_user() 

1016 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1017 

1018 with auth_api_session() as (auth_api, metadata_interceptor): 

1019 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1020 assert not res.logged_in 

1021 assert not res.HasField("auth_res") 

1022 

1023 with auth_api_session() as (auth_api, metadata_interceptor): 

1024 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1025 assert res.logged_in 

1026 assert res.HasField("auth_res") 

1027 assert res.auth_res.user_id == user.id 

1028 assert not res.auth_res.jailed 

1029 

1030 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1031 

1032 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1033 assert not res.logged_in 

1034 assert not res.HasField("auth_res") 

1035 

1036 with auth_api_session() as (auth_api, metadata_interceptor): 

1037 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1038 assert res.logged_in 

1039 assert res.HasField("auth_res") 

1040 assert res.auth_res.user_id == jailed_user.id 

1041 assert res.auth_res.jailed 

1042 

1043 

1044def test_signup_no_feedback_regression(db): 

1045 """ 

1046 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1047 this regression test checks that. 

1048 """ 

1049 with auth_api_session() as (auth_api, metadata_interceptor): 

1050 res = auth_api.SignupFlow( 

1051 auth_pb2.SignupFlowReq( 

1052 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1053 account=auth_pb2.SignupAccount( 

1054 username="frodo", 

1055 password="a very insecure password", 

1056 birthdate="1970-01-01", 

1057 gender="Bot", 

1058 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1059 city="New York City", 

1060 lat=40.7331, 

1061 lng=-73.9778, 

1062 radius=500, 

1063 accept_tos=True, 

1064 ), 

1065 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1066 ) 

1067 ) 

1068 

1069 flow_token = res.flow_token 

1070 

1071 assert res.flow_token 

1072 assert not res.HasField("auth_res") 

1073 assert not res.need_basic 

1074 assert not res.need_account 

1075 assert not res.need_feedback 

1076 assert res.need_verify_email 

1077 

1078 # read out the signup token directly from the database for now 

1079 with session_scope() as session: 

1080 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1081 assert flow.email_sent 

1082 assert not flow.email_verified 

1083 email_token = flow.email_token 

1084 

1085 with auth_api_session() as (auth_api, metadata_interceptor): 

1086 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1087 

1088 assert not res.flow_token 

1089 assert res.HasField("auth_res") 

1090 assert res.auth_res.user_id 

1091 assert not res.auth_res.jailed 

1092 assert not res.need_basic 

1093 assert not res.need_account 

1094 assert not res.need_feedback 

1095 assert not res.need_verify_email 

1096 

1097 # make sure we got the right token in a cookie 

1098 with session_scope() as session: 

1099 token = ( 

1100 session.execute( 

1101 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1102 ).scalar_one() 

1103 ).token 

1104 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1105 assert sesh == token 

1106 

1107 

1108def test_banned_username(db): 

1109 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1110 with auth_api_session() as (auth_api, metadata_interceptor): 

1111 reply = auth_api.SignupFlow( 

1112 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1113 ) 

1114 

1115 flow_token = reply.flow_token 

1116 

1117 with auth_api_session() as (auth_api, metadata_interceptor): 

1118 # Banned username 

1119 with pytest.raises(grpc.RpcError) as e: 

1120 reply = auth_api.SignupFlow( 

1121 auth_pb2.SignupFlowReq( 

1122 flow_token=flow_token, 

1123 account=auth_pb2.SignupAccount( 

1124 username="thecouchersadminaccount", 

1125 password="a very insecure password", 

1126 city="Minas Tirith", 

1127 birthdate="1980-12-31", 

1128 gender="Robot", 

1129 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1130 lat=1, 

1131 lng=1, 

1132 radius=100, 

1133 accept_tos=True, 

1134 ), 

1135 ) 

1136 ) 

1137 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1138 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE 

1139 

1140 

1141# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_* 

1142 

1143 

1144def test_GetInviteCodeInfo(db): 

1145 user, token = generate_user() 

1146 code_id = "TST12345" 

1147 

1148 with session_scope() as session: 

1149 avatar = Upload( 

1150 key="test_avatar.jpg", 

1151 filename="test_avatar.jpg", 

1152 creator_user_id=user.id, 

1153 ) 

1154 session.add(avatar) 

1155 session.flush() 

1156 

1157 db_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

1158 db_user.avatar_key = avatar.key 

1159 

1160 code = InviteCode(id=code_id, creator_user_id=user.id) 

1161 session.add(code) 

1162 session.commit() 

1163 

1164 with auth_api_session() as (auth, _): 

1165 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id)) 

1166 assert res.name == user.name 

1167 assert res.username == user.username 

1168 assert res.avatar_url.endswith("/img/thumbnail/test_avatar.jpg") 

1169 assert res.url == urls.invite_code_link(code=code_id) 

1170 

1171 

1172def test_GetInviteCodeInfo_no_avatar(db): 

1173 user, token = generate_user() 

1174 code_id = "NOAVTR1" 

1175 

1176 with session_scope() as session: 

1177 db_user = session.execute(select(User).where(User.id == user.id)).scalar_one() 

1178 db_user.avatar_key = None 

1179 

1180 code = InviteCode(id="NOAVTR1", creator_user_id=user.id) 

1181 session.add(code) 

1182 session.commit() 

1183 

1184 with auth_api_session() as (auth, _): 

1185 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code_id)) 

1186 assert res.name == user.name 

1187 assert res.username == user.username 

1188 assert res.avatar_url == "" 

1189 assert res.url == urls.invite_code_link(code=code_id) 

1190 

1191 

1192def test_GetInviteCodeInfo_not_found(db): 

1193 user, token = generate_user() 

1194 

1195 with auth_api_session() as (auth, _): 

1196 with pytest.raises(grpc.RpcError) as e: 

1197 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE")) 

1198 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1199 assert e.value.details() == errors.INVITE_CODE_NOT_FOUND 

1200 

1201 

1202def test_SignupFlow_invite_code(db): 

1203 user, token = generate_user() 

1204 invite_code = "INV12345" 

1205 with session_scope() as session: 

1206 session.flush() 

1207 invite = InviteCode(id=invite_code, creator_user_id=user.id) 

1208 session.add(invite) 

1209 session.commit() 

1210 

1211 with auth_api_session() as (auth_api, _): 

1212 # Signup basic step with invite code 

1213 res = auth_api.SignupFlow( 

1214 auth_pb2.SignupFlowReq( 

1215 basic=auth_pb2.SignupBasic( 

1216 name="Test User", 

1217 email="inviteuser@example.com", 

1218 invite_code=invite_code, 

1219 ) 

1220 ) 

1221 ) 

1222 flow_token = res.flow_token 

1223 assert flow_token 

1224 

1225 # Confirm email 

1226 with session_scope() as session: 

1227 email_token = ( 

1228 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token 

1229 ) 

1230 

1231 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1232 

1233 # Signup account step 

1234 auth_api.SignupFlow( 

1235 auth_pb2.SignupFlowReq( 

1236 flow_token=flow_token, 

1237 account=auth_pb2.SignupAccount( 

1238 username="invited_user", 

1239 password="secure password", 

1240 birthdate="1990-01-01", 

1241 gender="Other", 

1242 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1243 city="Example City", 

1244 lat=1, 

1245 lng=5, 

1246 radius=100, 

1247 accept_tos=True, 

1248 ), 

1249 feedback=auth_pb2.ContributorForm(), 

1250 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1251 ) 

1252 ) 

1253 with session_scope() as session: 

1254 users = session.execute(select(User)).scalars().all() 

1255 

1256 # Check that invite_code_id is stored in the final User object 

1257 with session_scope() as session: 

1258 user = session.execute(select(User).where(User.username == "invited_user")).scalar_one() 

1259 assert user.invite_code_id == invite_code