Coverage for app / backend / src / tests / test_auth.py: 100%

598 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import http.cookies 

2from typing import cast 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.sql import delete, func 

9 

10from couchers import urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.models import ( 

14 ContributeOption, 

15 ContributorForm, 

16 LoginToken, 

17 PasswordResetToken, 

18 SignupFlow, 

19 User, 

20 UserSession, 

21) 

22from couchers.proto import account_pb2, api_pb2, auth_pb2 

23from tests.fixtures.db import generate_user 

24from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

25from tests.fixtures.sessions import ( 

26 MetadataKeeperInterceptor, 

27 account_session, 

28 api_session, 

29 auth_api_session, 

30 real_api_session, 

31) 

32 

33 

34@pytest.fixture(autouse=True) 

35def _(testconfig, fast_passwords): 

36 pass 

37 

38 

39def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]: 

40 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

41 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

42 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

43 return sesh, uid 

44 

45 

46def test_UsernameValid(db): 

47 with auth_api_session() as (auth_api, metadata_interceptor): 

48 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

49 

50 with auth_api_session() as (auth_api, metadata_interceptor): 

51 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

52 

53 

54def test_signup_incremental(db): 

55 with auth_api_session() as (auth_api, metadata_interceptor): 

56 res = auth_api.SignupFlow( 

57 auth_pb2.SignupFlowReq( 

58 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

59 ) 

60 ) 

61 

62 flow_token = res.flow_token 

63 assert res.flow_token 

64 assert not res.HasField("auth_res") 

65 assert not res.need_basic 

66 assert res.need_account 

67 assert not res.need_feedback 

68 assert res.need_verify_email 

69 assert res.need_accept_community_guidelines 

70 

71 # read out the signup token directly from the database for now 

72 with session_scope() as session: 

73 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

74 assert flow.email_sent 

75 assert not flow.email_verified 

76 email_token = flow.email_token 

77 

78 with auth_api_session() as (auth_api, metadata_interceptor): 

79 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

80 

81 assert res.flow_token == flow_token 

82 assert not res.HasField("auth_res") 

83 assert not res.need_basic 

84 assert res.need_account 

85 assert not res.need_feedback 

86 assert res.need_verify_email 

87 assert res.need_accept_community_guidelines 

88 

89 # Add feedback 

90 with auth_api_session() as (auth_api, metadata_interceptor): 

91 res = auth_api.SignupFlow( 

92 auth_pb2.SignupFlowReq( 

93 flow_token=flow_token, 

94 feedback=auth_pb2.ContributorForm( 

95 ideas="I'm a robot, incapable of original ideation", 

96 features="I love all your features", 

97 experience="I haven't done couch surfing before", 

98 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

99 contribute_ways=["serving", "backend"], 

100 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

101 ), 

102 ) 

103 ) 

104 

105 assert res.flow_token == flow_token 

106 assert not res.HasField("auth_res") 

107 assert not res.need_basic 

108 assert res.need_account 

109 assert not res.need_feedback 

110 assert res.need_verify_email 

111 assert res.need_accept_community_guidelines 

112 

113 # Agree to community guidelines 

114 with auth_api_session() as (auth_api, metadata_interceptor): 

115 res = auth_api.SignupFlow( 

116 auth_pb2.SignupFlowReq( 

117 flow_token=flow_token, 

118 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

119 ) 

120 ) 

121 

122 assert res.flow_token == flow_token 

123 assert not res.HasField("auth_res") 

124 assert not res.need_basic 

125 assert res.need_account 

126 assert not res.need_feedback 

127 assert res.need_verify_email 

128 assert not res.need_accept_community_guidelines 

129 

130 # Verify email 

131 with auth_api_session() as (auth_api, metadata_interceptor): 

132 res = auth_api.SignupFlow( 

133 auth_pb2.SignupFlowReq( 

134 flow_token=flow_token, 

135 email_token=email_token, 

136 ) 

137 ) 

138 

139 assert res.flow_token == flow_token 

140 assert not res.HasField("auth_res") 

141 assert not res.need_basic 

142 assert res.need_account 

143 assert not res.need_feedback 

144 assert not res.need_verify_email 

145 assert not res.need_accept_community_guidelines 

146 

147 # Finally finish off account info 

148 with auth_api_session() as (auth_api, metadata_interceptor): 

149 res = auth_api.SignupFlow( 

150 auth_pb2.SignupFlowReq( 

151 flow_token=flow_token, 

152 account=auth_pb2.SignupAccount( 

153 username="frodo", 

154 password="a very insecure password", 

155 birthdate="1970-01-01", 

156 gender="Bot", 

157 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

158 city="New York City", 

159 lat=40.7331, 

160 lng=-73.9778, 

161 radius=500, 

162 accept_tos=True, 

163 ), 

164 ) 

165 ) 

166 

167 assert not res.flow_token 

168 assert res.HasField("auth_res") 

169 assert res.auth_res.user_id 

170 assert not res.auth_res.jailed 

171 assert not res.need_basic 

172 assert not res.need_account 

173 assert not res.need_feedback 

174 assert not res.need_verify_email 

175 assert not res.need_accept_community_guidelines 

176 

177 user_id = res.auth_res.user_id 

178 

179 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

180 assert uid == str(user_id) 

181 

182 with api_session(sess_token) as api: 

183 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

184 

185 assert res.username == "frodo" 

186 assert res.gender == "Bot" 

187 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

188 assert res.city == "New York City" 

189 assert res.lat == 40.7331 

190 assert res.lng == -73.9778 

191 assert res.radius == 500 

192 

193 with session_scope() as session: 

194 form = session.execute(select(ContributorForm)).scalar_one() 

195 

196 assert form.ideas == "I'm a robot, incapable of original ideation" 

197 assert form.features == "I love all your features" 

198 assert form.experience == "I haven't done couch surfing before" 

199 assert form.contribute == ContributeOption.yes 

200 assert form.contribute_ways == ["serving", "backend"] 

201 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

202 

203 

204def _quick_signup() -> int: 

205 with auth_api_session() as (auth_api, metadata_interceptor): 

206 res = auth_api.SignupFlow( 

207 auth_pb2.SignupFlowReq( 

208 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

209 account=auth_pb2.SignupAccount( 

210 username="frodo", 

211 password="a very insecure password", 

212 birthdate="1970-01-01", 

213 gender="Bot", 

214 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

215 city="New York City", 

216 lat=40.7331, 

217 lng=-73.9778, 

218 radius=500, 

219 accept_tos=True, 

220 ), 

221 feedback=auth_pb2.ContributorForm(), 

222 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

223 ) 

224 ) 

225 

226 flow_token = res.flow_token 

227 

228 assert res.flow_token 

229 assert not res.HasField("auth_res") 

230 assert not res.need_basic 

231 assert not res.need_account 

232 assert not res.need_feedback 

233 assert res.need_verify_email 

234 

235 # read out the signup token directly from the database for now 

236 with session_scope() as session: 

237 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

238 assert flow.email_sent 

239 assert not flow.email_verified 

240 email_token = flow.email_token 

241 

242 with auth_api_session() as (auth_api, metadata_interceptor): 

243 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

244 

245 assert not res.flow_token 

246 assert res.HasField("auth_res") 

247 assert res.auth_res.user_id 

248 assert not res.auth_res.jailed 

249 assert not res.need_basic 

250 assert not res.need_account 

251 assert not res.need_feedback 

252 assert not res.need_verify_email 

253 

254 # make sure we got the right token in a cookie 

255 with session_scope() as session: 

256 token = session.execute( 

257 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

258 ).scalar_one() 

259 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

260 assert sesh == token 

261 

262 return cast(int, res.auth_res.user_id) 

263 

264 

265def test_signup(db): 

266 _quick_signup() 

267 

268 

269def test_basic_login(db): 

270 # Create our test user using signup 

271 _quick_signup() 

272 

273 with auth_api_session() as (auth_api, metadata_interceptor): 

274 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

275 

276 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

277 

278 with session_scope() as session: 

279 token = session.execute( 

280 select(UserSession.token) 

281 .join(User, UserSession.user_id == User.id) 

282 .where(User.username == "frodo") 

283 .where(UserSession.token == reply_token) 

284 .where(UserSession.is_valid) 

285 ).scalar_one_or_none() 

286 assert token 

287 

288 # log out 

289 with auth_api_session() as (auth_api, metadata_interceptor): 

290 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

291 

292 

293def test_login_part_signed_up_verified_email(db): 

294 """ 

295 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

296 """ 

297 with auth_api_session() as (auth_api, metadata_interceptor): 

298 res = auth_api.SignupFlow( 

299 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

300 ) 

301 

302 flow_token = res.flow_token 

303 assert res.need_verify_email 

304 

305 # verify the email 

306 with session_scope() as session: 

307 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

308 flow_token = flow.flow_token 

309 email_token = flow.email_token 

310 with auth_api_session() as (auth_api, metadata_interceptor): 

311 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

312 

313 with mock_notification_email() as mock: 

314 with auth_api_session() as (auth_api, metadata_interceptor): 

315 with pytest.raises(grpc.RpcError) as err: 

316 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

317 assert err.value.details() == "Please check your email for a link to continue signing up." 

318 

319 assert mock.call_count == 1 

320 e = email_fields(mock) 

321 assert e.recipient == "email@couchers.org.invalid" 

322 assert flow_token in e.plain 

323 assert flow_token in e.html 

324 

325 

326def test_login_part_signed_up_not_verified_email(db): 

327 with auth_api_session() as (auth_api, metadata_interceptor): 

328 res = auth_api.SignupFlow( 

329 auth_pb2.SignupFlowReq( 

330 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

331 account=auth_pb2.SignupAccount( 

332 username="frodo", 

333 password="a very insecure password", 

334 birthdate="1999-01-01", 

335 gender="Bot", 

336 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

337 city="New York City", 

338 lat=40.7331, 

339 lng=-73.9778, 

340 radius=500, 

341 accept_tos=True, 

342 ), 

343 ) 

344 ) 

345 

346 flow_token = res.flow_token 

347 assert res.need_verify_email 

348 

349 with mock_notification_email() as mock: 

350 with auth_api_session() as (auth_api, metadata_interceptor): 

351 with pytest.raises(grpc.RpcError) as err: 

352 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

353 assert err.value.details() == "Please check your email for a link to continue signing up." 

354 

355 with session_scope() as session: 

356 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

357 email_token = flow.email_token 

358 

359 assert mock.call_count == 1 

360 e = email_fields(mock) 

361 assert e.recipient == "email@couchers.org.invalid" 

362 assert email_token 

363 assert email_token in e.plain 

364 assert email_token in e.html 

365 

366 

367def test_banned_user(db): 

368 _quick_signup() 

369 

370 with session_scope() as session: 

371 session.execute(select(User)).scalar_one().is_banned = True 

372 

373 with auth_api_session() as (auth_api, metadata_interceptor): 

374 with pytest.raises(grpc.RpcError) as e: 

375 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

376 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

377 assert e.value.details() == "Your account is suspended." 

378 

379 

380def test_deleted_user(db): 

381 user_id = _quick_signup() 

382 

383 with session_scope() as session: 

384 session.execute(update(User).where(User.id == user_id).values(is_deleted=True)) 

385 

386 with auth_api_session() as (auth_api, metadata_interceptor): 

387 with pytest.raises(grpc.RpcError) as e: 

388 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

389 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

390 assert e.value.details() == "An account with that username or email was not found." 

391 

392 

393def test_invalid_token(db): 

394 user1, token1 = generate_user() 

395 user2, token2 = generate_user() 

396 

397 wrong_token = random_hex(32) 

398 

399 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

400 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

401 

402 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

403 assert e.value.details() == "Unauthorized" 

404 

405 

406def test_password_reset_v2(db, push_collector: PushCollector): 

407 user, token = generate_user(hashed_password=hash_password("mypassword")) 

408 

409 with auth_api_session() as (auth_api, metadata_interceptor): 

410 with mock_notification_email() as mock: 

411 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

412 

413 with session_scope() as session: 

414 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one() 

415 

416 assert mock.call_count == 1 

417 e = email_fields(mock) 

418 assert e.recipient == user.email 

419 assert "reset" in e.subject.lower() 

420 assert password_reset_token in e.plain 

421 assert password_reset_token in e.html 

422 unique_string = "You asked for your password to be reset on Couchers.org." 

423 assert unique_string in e.plain 

424 assert unique_string in e.html 

425 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

426 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

427 assert "support@couchers.org" in e.plain 

428 assert "support@couchers.org" in e.html 

429 

430 push = push_collector.pop_for_user(user.id, last=True) 

431 assert push.content.title == "Password reset requested" 

432 assert push.content.body == "Use the link we sent by email to complete it." 

433 

434 # make sure bad password are caught 

435 with auth_api_session() as (auth_api, metadata_interceptor): 

436 with pytest.raises(grpc.RpcError) as err: 

437 auth_api.CompletePasswordResetV2( 

438 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

439 ) 

440 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

441 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable." 

442 

443 # make sure we can set a good password 

444 with auth_api_session() as (auth_api, metadata_interceptor): 

445 pwd = random_hex() 

446 with mock_notification_email() as mock: 

447 auth_api.CompletePasswordResetV2( 

448 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

449 ) 

450 

451 push = push_collector.pop_for_user(user.id, last=True) 

452 assert push.content.title == "Password reset" 

453 assert push.content.body == "Your password was successfully reset." 

454 

455 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

456 

457 with session_scope() as session: 

458 other_session_token = session.execute( 

459 select(UserSession.token) 

460 .join(User, UserSession.user_id == User.id) 

461 .where(User.username == user.username) 

462 .where(UserSession.token == session_token) 

463 .where(UserSession.is_valid) 

464 ).scalar_one_or_none() 

465 assert other_session_token 

466 

467 # make sure we can't set a password again 

468 with auth_api_session() as (auth_api, metadata_interceptor): 

469 with pytest.raises(grpc.RpcError) as err: 

470 auth_api.CompletePasswordResetV2( 

471 auth_pb2.CompletePasswordResetV2Req( 

472 password_reset_token=password_reset_token, new_password=random_hex() 

473 ) 

474 ) 

475 assert err.value.code() == grpc.StatusCode.NOT_FOUND 

476 assert err.value.details() == "Invalid token." 

477 

478 with session_scope() as session: 

479 user = session.execute(select(User)).scalar_one() 

480 assert user.hashed_password == hash_password(pwd) 

481 

482 

483def test_password_reset_no_such_user(db): 

484 user, token = generate_user() 

485 

486 with auth_api_session() as (auth_api, metadata_interceptor): 

487 res = auth_api.ResetPassword( 

488 auth_pb2.ResetPasswordReq( 

489 user="nonexistentuser", 

490 ) 

491 ) 

492 

493 with session_scope() as session: 

494 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None 

495 

496 

497def test_password_reset_invalid_token_v2(db): 

498 password = random_hex() 

499 user, token = generate_user(hashed_password=hash_password(password)) 

500 

501 with auth_api_session() as (auth_api, metadata_interceptor): 

502 res = auth_api.ResetPassword( 

503 auth_pb2.ResetPasswordReq( 

504 user=user.username, 

505 ) 

506 ) 

507 

508 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

509 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

510 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

511 assert e.value.details() == "Invalid token." 

512 

513 with session_scope() as session: 

514 user = session.execute(select(User)).scalar_one() 

515 assert user.hashed_password == hash_password(password) 

516 

517 

518def test_logout_invalid_token(db): 

519 # Create our test user using signup 

520 _quick_signup() 

521 

522 with auth_api_session() as (auth_api, metadata_interceptor): 

523 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

524 

525 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

526 

527 # delete all login tokens 

528 with session_scope() as session: 

529 session.execute(delete(LoginToken)) 

530 

531 # log out with non-existent token should still return a valid result 

532 with auth_api_session() as (auth_api, metadata_interceptor): 

533 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

534 

535 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

536 # make sure we set an empty cookie 

537 assert reply_token == "" 

538 

539 

540def test_signup_without_password(db): 

541 with auth_api_session() as (auth_api, metadata_interceptor): 

542 with pytest.raises(grpc.RpcError) as e: 

543 auth_api.SignupFlow( 

544 auth_pb2.SignupFlowReq( 

545 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

546 account=auth_pb2.SignupAccount( 

547 username="frodo", 

548 password="bad", 

549 city="Minas Tirith", 

550 birthdate="9999-12-31", # arbitrary future birthdate 

551 gender="Robot", 

552 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

553 lat=1, 

554 lng=1, 

555 radius=100, 

556 accept_tos=True, 

557 ), 

558 feedback=auth_pb2.ContributorForm(), 

559 ) 

560 ) 

561 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

562 assert e.value.details() == "The password must be 8 or more characters long." 

563 

564 

565def test_signup_invalid_birthdate(db): 

566 with auth_api_session() as (auth_api, metadata_interceptor): 

567 with pytest.raises(grpc.RpcError) as e: 

568 auth_api.SignupFlow( 

569 auth_pb2.SignupFlowReq( 

570 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

571 account=auth_pb2.SignupAccount( 

572 username="frodo", 

573 password="a very insecure password", 

574 city="Minas Tirith", 

575 birthdate="9999-12-31", # arbitrary future birthdate 

576 gender="Robot", 

577 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

578 lat=1, 

579 lng=1, 

580 radius=100, 

581 accept_tos=True, 

582 ), 

583 feedback=auth_pb2.ContributorForm(), 

584 ) 

585 ) 

586 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

587 assert e.value.details() == "You must be at least 18 years old to sign up." 

588 

589 res = auth_api.SignupFlow( 

590 auth_pb2.SignupFlowReq( 

591 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

592 account=auth_pb2.SignupAccount( 

593 username="ceelo", 

594 password="a very insecure password", 

595 city="New York City", 

596 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

597 gender="Helicopter", 

598 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

599 lat=1, 

600 lng=1, 

601 radius=100, 

602 accept_tos=True, 

603 ), 

604 feedback=auth_pb2.ContributorForm(), 

605 ) 

606 ) 

607 

608 assert res.flow_token 

609 

610 with pytest.raises(grpc.RpcError) as e: 

611 auth_api.SignupFlow( 

612 auth_pb2.SignupFlowReq( 

613 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

614 account=auth_pb2.SignupAccount( 

615 username="franklin", 

616 password="a very insecure password", 

617 city="Los Santos", 

618 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

619 gender="Male", 

620 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

621 lat=1, 

622 lng=1, 

623 radius=100, 

624 accept_tos=True, 

625 ), 

626 feedback=auth_pb2.ContributorForm(), 

627 ) 

628 ) 

629 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

630 assert e.value.details() == "You must be at least 18 years old to sign up." 

631 

632 with session_scope() as session: 

633 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

634 

635 

636def test_signup_invalid_email(db): 

637 with auth_api_session() as (auth_api, metadata_interceptor): 

638 with pytest.raises(grpc.RpcError) as e: 

639 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

640 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

641 assert e.value.details() == "Invalid email." 

642 

643 with auth_api_session() as (auth_api, metadata_interceptor): 

644 with pytest.raises(grpc.RpcError) as e: 

645 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

646 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

647 assert e.value.details() == "Invalid email." 

648 

649 with auth_api_session() as (auth_api, metadata_interceptor): 

650 with pytest.raises(grpc.RpcError) as e: 

651 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

652 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

653 assert e.value.details() == "Invalid email." 

654 

655 with auth_api_session() as (auth_api, metadata_interceptor): 

656 with pytest.raises(grpc.RpcError) as e: 

657 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

658 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

659 assert e.value.details() == "Invalid email." 

660 

661 

662def test_signup_existing_email(db): 

663 # Signed up user 

664 user, _ = generate_user() 

665 

666 with auth_api_session() as (auth_api, metadata_interceptor): 

667 with pytest.raises(grpc.RpcError) as e: 

668 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))) 

669 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

670 assert e.value.details() == "That email address is already associated with an account. Please log in instead!" 

671 

672 

673def test_signup_banned_user_email(db): 

674 user, _ = generate_user() 

675 

676 with session_scope() as session: 

677 session.execute(update(User).where(User.id == user.id).values(is_banned=True)) 

678 

679 with auth_api_session() as (auth_api, _): 

680 with pytest.raises(grpc.RpcError) as e: 

681 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

682 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

683 assert e.value.details() == "You cannot sign up with that email address." 

684 

685 

686def test_signup_deleted_user_email(db): 

687 user, _ = generate_user() 

688 

689 with session_scope() as session: 

690 session.execute(update(User).where(User.id == user.id).values(is_deleted=True)) 

691 

692 with auth_api_session() as (auth_api, _): 

693 with pytest.raises(grpc.RpcError) as e: 

694 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

695 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

696 assert e.value.details() == "You cannot sign up with that email address." 

697 

698 

699def test_signup_continue_with_email(db): 

700 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

701 with auth_api_session() as (auth_api, metadata_interceptor): 

702 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

703 flow_token = res.flow_token 

704 assert flow_token 

705 

706 # continue with same email, should just send another email to the user 

707 with auth_api_session() as (auth_api, metadata_interceptor): 

708 with pytest.raises(grpc.RpcError) as e: 

709 res = auth_api.SignupFlow( 

710 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

711 ) 

712 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

713 assert e.value.details() == "Please check your email for a link to continue signing up." 

714 

715 

716def test_signup_resend_email(db): 

717 with auth_api_session() as (auth_api, metadata_interceptor): 

718 with mock_notification_email() as mock: 

719 res = auth_api.SignupFlow( 

720 auth_pb2.SignupFlowReq( 

721 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

722 account=auth_pb2.SignupAccount( 

723 username="frodo", 

724 password="a very insecure password", 

725 birthdate="1970-01-01", 

726 gender="Bot", 

727 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

728 city="New York City", 

729 lat=40.7331, 

730 lng=-73.9778, 

731 radius=500, 

732 accept_tos=True, 

733 ), 

734 feedback=auth_pb2.ContributorForm(), 

735 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

736 ) 

737 ) 

738 assert mock.call_count == 1 

739 e = email_fields(mock) 

740 assert e.recipient == "email@couchers.org.invalid" 

741 

742 flow_token = res.flow_token 

743 assert flow_token 

744 

745 with session_scope() as session: 

746 flow = session.execute(select(SignupFlow)).scalar_one() 

747 assert flow.flow_token == flow_token 

748 assert flow.email_sent 

749 assert not flow.email_verified 

750 email_token = flow.email_token 

751 

752 # ask for a new signup email 

753 with auth_api_session() as (auth_api, metadata_interceptor): 

754 with mock_notification_email() as mock: 

755 res = auth_api.SignupFlow( 

756 auth_pb2.SignupFlowReq( 

757 flow_token=flow_token, 

758 resend_verification_email=True, 

759 ) 

760 ) 

761 assert mock.call_count == 1 

762 e = email_fields(mock) 

763 assert e.recipient == "email@couchers.org.invalid" 

764 assert email_token 

765 assert email_token in e.plain 

766 assert email_token in e.html 

767 

768 with session_scope() as session: 

769 flow = session.execute(select(SignupFlow)).scalar_one() 

770 assert not flow.email_verified 

771 

772 with auth_api_session() as (auth_api, metadata_interceptor): 

773 res = auth_api.SignupFlow( 

774 auth_pb2.SignupFlowReq( 

775 email_token=email_token, 

776 ) 

777 ) 

778 

779 assert not res.flow_token 

780 assert res.HasField("auth_res") 

781 

782 

783def test_successful_authenticate(db): 

784 user, _ = generate_user(hashed_password=hash_password("password")) 

785 

786 # Authenticate with username 

787 with auth_api_session() as (auth_api, metadata_interceptor): 

788 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

789 assert not reply.jailed 

790 

791 # Authenticate with email 

792 with auth_api_session() as (auth_api, metadata_interceptor): 

793 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

794 assert not reply.jailed 

795 

796 

797def test_unsuccessful_authenticate(db): 

798 user, _ = generate_user(hashed_password=hash_password("password")) 

799 

800 # Invalid password 

801 with auth_api_session() as (auth_api, metadata_interceptor): 

802 with pytest.raises(grpc.RpcError) as e: 

803 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

804 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

805 assert e.value.details() == "Wrong password." 

806 

807 # Invalid username 

808 with auth_api_session() as (auth_api, metadata_interceptor): 

809 with pytest.raises(grpc.RpcError) as e: 

810 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

811 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

812 assert e.value.details() == "An account with that username or email was not found." 

813 

814 # Invalid email 

815 with auth_api_session() as (auth_api, metadata_interceptor): 

816 with pytest.raises(grpc.RpcError) as e: 

817 reply = auth_api.Authenticate( 

818 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

819 ) 

820 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

821 assert e.value.details() == "An account with that username or email was not found." 

822 

823 # Invalid id 

824 with auth_api_session() as (auth_api, metadata_interceptor): 

825 with pytest.raises(grpc.RpcError) as e: 

826 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

827 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

828 assert e.value.details() == "An account with that username or email was not found." 

829 

830 

831def test_complete_signup(db): 

832 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

833 with auth_api_session() as (auth_api, metadata_interceptor): 

834 reply = auth_api.SignupFlow( 

835 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

836 ) 

837 

838 flow_token = reply.flow_token 

839 

840 with auth_api_session() as (auth_api, metadata_interceptor): 

841 # Invalid username 

842 with pytest.raises(grpc.RpcError) as e: 

843 auth_api.SignupFlow( 

844 auth_pb2.SignupFlowReq( 

845 flow_token=flow_token, 

846 account=auth_pb2.SignupAccount( 

847 username=" ", 

848 password="a very insecure password", 

849 city="Minas Tirith", 

850 birthdate="1980-12-31", 

851 gender="Robot", 

852 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

853 lat=1, 

854 lng=1, 

855 radius=100, 

856 accept_tos=True, 

857 ), 

858 ) 

859 ) 

860 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

861 assert e.value.details() == "Invalid username." 

862 

863 with auth_api_session() as (auth_api, metadata_interceptor): 

864 # Invalid name 

865 with pytest.raises(grpc.RpcError) as e: 

866 auth_api.SignupFlow( 

867 auth_pb2.SignupFlowReq( 

868 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

869 ) 

870 ) 

871 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

872 assert e.value.details() == "Name not supported." 

873 

874 with auth_api_session() as (auth_api, metadata_interceptor): 

875 # Hosting status required 

876 with pytest.raises(grpc.RpcError) as e: 

877 auth_api.SignupFlow( 

878 auth_pb2.SignupFlowReq( 

879 flow_token=flow_token, 

880 account=auth_pb2.SignupAccount( 

881 username="frodo", 

882 password="a very insecure password", 

883 city="Minas Tirith", 

884 birthdate="1980-12-31", 

885 gender="Robot", 

886 hosting_status=None, 

887 lat=1, 

888 lng=1, 

889 radius=100, 

890 accept_tos=True, 

891 ), 

892 ) 

893 ) 

894 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

895 assert e.value.details() == "Hosting status is required." 

896 

897 user, _ = generate_user() 

898 with auth_api_session() as (auth_api, metadata_interceptor): 

899 # Username unavailable 

900 with pytest.raises(grpc.RpcError) as e: 

901 auth_api.SignupFlow( 

902 auth_pb2.SignupFlowReq( 

903 flow_token=flow_token, 

904 account=auth_pb2.SignupAccount( 

905 username=user.username, 

906 password="a very insecure password", 

907 city="Minas Tirith", 

908 birthdate="1980-12-31", 

909 gender="Robot", 

910 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

911 lat=1, 

912 lng=1, 

913 radius=100, 

914 accept_tos=True, 

915 ), 

916 ) 

917 ) 

918 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

919 assert e.value.details() == "Sorry, that username isn't available." 

920 

921 with auth_api_session() as (auth_api, metadata_interceptor): 

922 # Invalid coordinate 

923 with pytest.raises(grpc.RpcError) as e: 

924 auth_api.SignupFlow( 

925 auth_pb2.SignupFlowReq( 

926 flow_token=flow_token, 

927 account=auth_pb2.SignupAccount( 

928 username="frodo", 

929 password="a very insecure password", 

930 city="Minas Tirith", 

931 birthdate="1980-12-31", 

932 gender="Robot", 

933 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

934 lat=0, 

935 lng=0, 

936 radius=100, 

937 accept_tos=True, 

938 ), 

939 ) 

940 ) 

941 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

942 assert e.value.details() == "Invalid coordinate." 

943 

944 

945def test_signup_token_regression(db): 

946 # Repro steps: 

947 # 1. Start a signup 

948 # 2. Confirm the email 

949 # 3. Start a new signup with the same email 

950 # Expected: send a link to the email to continue signing up. 

951 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

952 

953 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

954 

955 # 1. Start a signup 

956 with auth_api_session() as (auth_api, metadata_interceptor): 

957 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

958 flow_token = res.flow_token 

959 assert flow_token 

960 

961 # 2. Confirm the email 

962 with session_scope() as session: 

963 email_token = session.execute( 

964 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

965 ).scalar_one() 

966 

967 with auth_api_session() as (auth_api, metadata_interceptor): 

968 auth_api.SignupFlow( 

969 auth_pb2.SignupFlowReq( 

970 flow_token=flow_token, 

971 email_token=email_token, 

972 ) 

973 ) 

974 

975 # 3. Start a new signup with the same email 

976 with auth_api_session() as (auth_api, metadata_interceptor): 

977 with pytest.raises(grpc.RpcError) as e: 

978 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

979 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

980 assert e.value.details() == "Please check your email for a link to continue signing up." 

981 

982 

983@pytest.mark.parametrize("opt_out", [True, False]) 

984def test_opt_out_of_newsletter(db, opt_out): 

985 with auth_api_session() as (auth_api, metadata_interceptor): 

986 res = auth_api.SignupFlow( 

987 auth_pb2.SignupFlowReq( 

988 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

989 account=auth_pb2.SignupAccount( 

990 username="frodo", 

991 password="a very insecure password", 

992 birthdate="1970-01-01", 

993 gender="Bot", 

994 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

995 city="New York City", 

996 lat=40.7331, 

997 lng=-73.9778, 

998 radius=500, 

999 accept_tos=True, 

1000 opt_out_of_newsletter=opt_out, 

1001 ), 

1002 feedback=auth_pb2.ContributorForm(), 

1003 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1004 ) 

1005 ) 

1006 

1007 with session_scope() as session: 

1008 email_token = session.execute( 

1009 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token) 

1010 ).scalar_one() 

1011 

1012 with auth_api_session() as (auth_api, metadata_interceptor): 

1013 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1014 

1015 user_id = res.auth_res.user_id 

1016 

1017 with session_scope() as session: 

1018 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1019 assert not user.in_sync_with_newsletter 

1020 assert user.opt_out_of_newsletter == opt_out 

1021 

1022 

1023def test_GetAuthState(db): 

1024 user, token = generate_user() 

1025 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1026 

1027 with auth_api_session() as (auth_api, metadata_interceptor): 

1028 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1029 assert not res.logged_in 

1030 assert not res.HasField("auth_res") 

1031 

1032 with auth_api_session() as (auth_api, metadata_interceptor): 

1033 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1034 assert res.logged_in 

1035 assert res.HasField("auth_res") 

1036 assert res.auth_res.user_id == user.id 

1037 assert not res.auth_res.jailed 

1038 

1039 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1040 

1041 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1042 assert not res.logged_in 

1043 assert not res.HasField("auth_res") 

1044 

1045 with auth_api_session() as (auth_api, metadata_interceptor): 

1046 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1047 assert res.logged_in 

1048 assert res.HasField("auth_res") 

1049 assert res.auth_res.user_id == jailed_user.id 

1050 assert res.auth_res.jailed 

1051 

1052 

1053def test_signup_no_feedback_regression(db): 

1054 """ 

1055 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1056 this regression test checks that. 

1057 """ 

1058 with auth_api_session() as (auth_api, metadata_interceptor): 

1059 res = auth_api.SignupFlow( 

1060 auth_pb2.SignupFlowReq( 

1061 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1062 account=auth_pb2.SignupAccount( 

1063 username="frodo", 

1064 password="a very insecure password", 

1065 birthdate="1970-01-01", 

1066 gender="Bot", 

1067 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1068 city="New York City", 

1069 lat=40.7331, 

1070 lng=-73.9778, 

1071 radius=500, 

1072 accept_tos=True, 

1073 ), 

1074 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1075 ) 

1076 ) 

1077 

1078 flow_token = res.flow_token 

1079 

1080 assert res.flow_token 

1081 assert not res.HasField("auth_res") 

1082 assert not res.need_basic 

1083 assert not res.need_account 

1084 assert not res.need_feedback 

1085 assert res.need_verify_email 

1086 

1087 # read out the signup token directly from the database for now 

1088 with session_scope() as session: 

1089 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1090 assert flow.email_sent 

1091 assert not flow.email_verified 

1092 email_token = flow.email_token 

1093 

1094 with auth_api_session() as (auth_api, metadata_interceptor): 

1095 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1096 

1097 assert not res.flow_token 

1098 assert res.HasField("auth_res") 

1099 assert res.auth_res.user_id 

1100 assert not res.auth_res.jailed 

1101 assert not res.need_basic 

1102 assert not res.need_account 

1103 assert not res.need_feedback 

1104 assert not res.need_verify_email 

1105 

1106 # make sure we got the right token in a cookie 

1107 with session_scope() as session: 

1108 token = session.execute( 

1109 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1110 ).scalar_one() 

1111 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1112 assert sesh == token 

1113 

1114 

1115def test_banned_username(db): 

1116 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1117 with auth_api_session() as (auth_api, metadata_interceptor): 

1118 reply = auth_api.SignupFlow( 

1119 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1120 ) 

1121 

1122 flow_token = reply.flow_token 

1123 

1124 with auth_api_session() as (auth_api, metadata_interceptor): 

1125 # Banned username 

1126 with pytest.raises(grpc.RpcError) as e: 

1127 auth_api.SignupFlow( 

1128 auth_pb2.SignupFlowReq( 

1129 flow_token=flow_token, 

1130 account=auth_pb2.SignupAccount( 

1131 username="thecouchersadminaccount", 

1132 password="a very insecure password", 

1133 city="Minas Tirith", 

1134 birthdate="1980-12-31", 

1135 gender="Robot", 

1136 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1137 lat=1, 

1138 lng=1, 

1139 radius=100, 

1140 accept_tos=True, 

1141 ), 

1142 ) 

1143 ) 

1144 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1145 assert e.value.details() == "Sorry, that username isn't available." 

1146 

1147 

1148# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_* 

1149 

1150 

1151def test_GetInviteCodeInfo(db): 

1152 user, token = generate_user(complete_profile=True) 

1153 

1154 with account_session(token) as account: 

1155 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1156 

1157 with auth_api_session() as (auth, _): 

1158 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1159 assert res.name == user.name 

1160 assert res.username == user.username 

1161 # Avatar URL should be a thumbnail URL with a hashed filename 

1162 assert "/img/thumbnail/" in res.avatar_url 

1163 assert res.avatar_url.endswith(".jpg") 

1164 # Verify the hashed filename looks correct (64 char hex hash) 

1165 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64 

1166 assert res.url == urls.invite_code_link(code=code) 

1167 

1168 

1169def test_GetInviteCodeInfo_no_avatar(db): 

1170 user, token = generate_user(complete_profile=False) 

1171 

1172 with account_session(token) as account: 

1173 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1174 

1175 with auth_api_session() as (auth, _): 

1176 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1177 assert res.name == user.name 

1178 assert res.username == user.username 

1179 assert res.avatar_url == "" 

1180 assert res.url == urls.invite_code_link(code=code) 

1181 

1182 

1183def test_GetInviteCodeInfo_not_found(db): 

1184 generate_user() 

1185 

1186 with auth_api_session() as (auth, _): 

1187 with pytest.raises(grpc.RpcError) as e: 

1188 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE")) 

1189 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1190 assert e.value.details() == "Invite code not found." 

1191 

1192 

1193def test_SignupFlow_invite_code(db): 

1194 user, token = generate_user() 

1195 

1196 with account_session(token) as account: 

1197 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1198 

1199 with auth_api_session() as (auth_api, _): 

1200 # Signup basic step with invite code 

1201 res = auth_api.SignupFlow( 

1202 auth_pb2.SignupFlowReq( 

1203 basic=auth_pb2.SignupBasic( 

1204 name="Test User", 

1205 email="inviteuser@example.com", 

1206 invite_code=invite_code, 

1207 ) 

1208 ) 

1209 ) 

1210 flow_token = res.flow_token 

1211 assert flow_token 

1212 

1213 # Confirm email 

1214 with session_scope() as session: 

1215 email_token = session.execute( 

1216 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

1217 ).scalar_one() 

1218 

1219 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1220 

1221 # Signup account step 

1222 auth_api.SignupFlow( 

1223 auth_pb2.SignupFlowReq( 

1224 flow_token=flow_token, 

1225 account=auth_pb2.SignupAccount( 

1226 username="invited_user", 

1227 password="secure password", 

1228 birthdate="1990-01-01", 

1229 gender="Other", 

1230 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1231 city="Example City", 

1232 lat=1, 

1233 lng=5, 

1234 radius=100, 

1235 accept_tos=True, 

1236 ), 

1237 feedback=auth_pb2.ContributorForm(), 

1238 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1239 ) 

1240 ) 

1241 

1242 # Check that invite_code_id is stored in the final User object 

1243 with session_scope() as session: 

1244 invite_code_id = session.execute( 

1245 select(User.invite_code_id).where(User.username == "invited_user") 

1246 ).scalar_one() 

1247 assert invite_code_id == invite_code