Coverage for app/backend/src/tests/test_auth.py: 100%

733 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import http.cookies 

2from typing import cast 

3from unittest.mock import DEFAULT, patch 

4 

5import grpc 

6import pytest 

7from google.protobuf import empty_pb2, wrappers_pb2 

8from sqlalchemy import select, update 

9from sqlalchemy.sql import delete, func 

10 

11from couchers import urls 

12from couchers.crypto import hash_password, random_hex 

13from couchers.db import session_scope 

14from couchers.models import ( 

15 ContributeOption, 

16 ContributorForm, 

17 LoginToken, 

18 NonvisibleUserAccess, 

19 NonvisibleUserAccessType, 

20 NonvisibleUserState, 

21 PasswordResetToken, 

22 SignupFlow, 

23 User, 

24 UserSession, 

25) 

26from couchers.proto import account_pb2, api_pb2, auth_pb2 

27from couchers.utils import now 

28from tests.fixtures.db import generate_user 

29from tests.fixtures.misc import EmailCollector, PushCollector 

30from tests.fixtures.sessions import ( 

31 MetadataKeeperInterceptor, 

32 account_session, 

33 api_session, 

34 auth_api_session, 

35 real_api_session, 

36) 

37 

38 

39@pytest.fixture(autouse=True) 

40def _(testconfig, fast_passwords): 

41 pass 

42 

43 

44def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]: 

45 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

46 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

47 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

48 return sesh, uid 

49 

50 

51def test_UsernameValid(db): 

52 with auth_api_session() as (auth_api, metadata_interceptor): 

53 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

54 

55 with auth_api_session() as (auth_api, metadata_interceptor): 

56 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

57 

58 

59def test_signup_incremental(db): 

60 with auth_api_session() as (auth_api, metadata_interceptor): 

61 res = auth_api.SignupFlow( 

62 auth_pb2.SignupFlowReq( 

63 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

64 ) 

65 ) 

66 

67 flow_token = res.flow_token 

68 assert res.flow_token 

69 assert not res.HasField("auth_res") 

70 assert not res.need_basic 

71 assert res.need_account 

72 assert not res.need_feedback 

73 assert res.need_verify_email 

74 assert res.need_accept_community_guidelines 

75 assert res.need_motivations 

76 

77 # read out the signup token directly from the database for now 

78 with session_scope() as session: 

79 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

80 assert flow.email_sent 

81 assert not flow.email_verified 

82 email_token = flow.email_token 

83 

84 with auth_api_session() as (auth_api, metadata_interceptor): 

85 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

86 

87 assert res.flow_token == flow_token 

88 assert not res.HasField("auth_res") 

89 assert not res.need_basic 

90 assert res.need_account 

91 assert not res.need_feedback 

92 assert res.need_verify_email 

93 assert res.need_accept_community_guidelines 

94 assert res.need_motivations 

95 

96 # Add feedback 

97 with auth_api_session() as (auth_api, metadata_interceptor): 

98 res = auth_api.SignupFlow( 

99 auth_pb2.SignupFlowReq( 

100 flow_token=flow_token, 

101 feedback=auth_pb2.ContributorForm( 

102 ideas="I'm a robot, incapable of original ideation", 

103 features="I love all your features", 

104 experience="I haven't done couch surfing before", 

105 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

106 contribute_ways=["serving", "backend"], 

107 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

108 ), 

109 ) 

110 ) 

111 

112 assert res.flow_token == flow_token 

113 assert not res.HasField("auth_res") 

114 assert not res.need_basic 

115 assert res.need_account 

116 assert not res.need_feedback 

117 assert res.need_verify_email 

118 assert res.need_accept_community_guidelines 

119 assert res.need_motivations 

120 

121 # Agree to community guidelines 

122 with auth_api_session() as (auth_api, metadata_interceptor): 

123 res = auth_api.SignupFlow( 

124 auth_pb2.SignupFlowReq( 

125 flow_token=flow_token, 

126 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

127 ) 

128 ) 

129 

130 assert res.flow_token == flow_token 

131 assert not res.HasField("auth_res") 

132 assert not res.need_basic 

133 assert res.need_account 

134 assert not res.need_feedback 

135 assert res.need_verify_email 

136 assert not res.need_accept_community_guidelines 

137 assert res.need_motivations 

138 

139 # Submit motivations 

140 with auth_api_session() as (auth_api, metadata_interceptor): 

141 res = auth_api.SignupFlow( 

142 auth_pb2.SignupFlowReq( 

143 flow_token=flow_token, 

144 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

145 ) 

146 ) 

147 

148 assert res.flow_token == flow_token 

149 assert not res.HasField("auth_res") 

150 assert not res.need_basic 

151 assert res.need_account 

152 assert not res.need_feedback 

153 assert res.need_verify_email 

154 assert not res.need_accept_community_guidelines 

155 assert not res.need_motivations 

156 

157 # Verify email 

158 with auth_api_session() as (auth_api, metadata_interceptor): 

159 res = auth_api.SignupFlow( 

160 auth_pb2.SignupFlowReq( 

161 flow_token=flow_token, 

162 email_token=email_token, 

163 ) 

164 ) 

165 

166 assert res.flow_token == flow_token 

167 assert not res.HasField("auth_res") 

168 assert not res.need_basic 

169 assert res.need_account 

170 assert not res.need_feedback 

171 assert not res.need_verify_email 

172 assert not res.need_accept_community_guidelines 

173 assert not res.need_motivations 

174 

175 # Finally finish off account info 

176 with auth_api_session() as (auth_api, metadata_interceptor): 

177 res = auth_api.SignupFlow( 

178 auth_pb2.SignupFlowReq( 

179 flow_token=flow_token, 

180 account=auth_pb2.SignupAccount( 

181 username="frodo", 

182 password="a very insecure password", 

183 birthdate="1970-01-01", 

184 gender="Bot", 

185 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

186 city="New York City", 

187 lat=40.7331, 

188 lng=-73.9778, 

189 radius=500, 

190 accept_tos=True, 

191 ), 

192 ) 

193 ) 

194 

195 assert not res.flow_token 

196 assert res.HasField("auth_res") 

197 assert res.auth_res.user_id 

198 assert not res.auth_res.jailed 

199 assert not res.need_basic 

200 assert not res.need_account 

201 assert not res.need_feedback 

202 assert not res.need_verify_email 

203 assert not res.need_accept_community_guidelines 

204 assert not res.need_motivations 

205 

206 user_id = res.auth_res.user_id 

207 

208 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

209 assert uid == str(user_id) 

210 

211 with api_session(sess_token) as api: 

212 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

213 

214 assert res.username == "frodo" 

215 assert res.gender == "Bot" 

216 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

217 assert res.city == "New York City" 

218 assert res.lat == 40.7331 

219 assert res.lng == -73.9778 

220 assert res.radius == 500 

221 

222 with session_scope() as session: 

223 form = session.execute(select(ContributorForm)).scalar_one() 

224 

225 assert form.ideas == "I'm a robot, incapable of original ideation" 

226 assert form.features == "I love all your features" 

227 assert form.experience == "I haven't done couch surfing before" 

228 assert form.contribute == ContributeOption.yes 

229 assert form.contribute_ways == ["serving", "backend"] 

230 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

231 

232 

233def test_signup_funnel_counters(db): 

234 """Each per-step signup funnel counter should fire exactly once across an incremental signup.""" 

235 with patch.multiple( 

236 "couchers.servicers.auth", 

237 signup_initiations_counter=DEFAULT, 

238 signup_account_filled_counter=DEFAULT, 

239 signup_email_verified_counter=DEFAULT, 

240 signup_guidelines_accepted_counter=DEFAULT, 

241 signup_motivations_filled_counter=DEFAULT, 

242 signup_completions_counter=DEFAULT, 

243 ) as counters: 

244 with auth_api_session() as (auth_api, metadata_interceptor): 

245 res = auth_api.SignupFlow( 

246 auth_pb2.SignupFlowReq( 

247 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

248 ) 

249 ) 

250 flow_token = res.flow_token 

251 counters["signup_initiations_counter"].inc.assert_called_once() 

252 

253 with session_scope() as session: 

254 email_token = ( 

255 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token 

256 ) 

257 

258 with auth_api_session() as (auth_api, metadata_interceptor): 

259 auth_api.SignupFlow( 

260 auth_pb2.SignupFlowReq( 

261 flow_token=flow_token, 

262 account=auth_pb2.SignupAccount( 

263 username="frodo", 

264 password="a very insecure password", 

265 birthdate="1970-01-01", 

266 gender="Bot", 

267 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

268 city="New York City", 

269 lat=40.7331, 

270 lng=-73.9778, 

271 radius=500, 

272 accept_tos=True, 

273 ), 

274 ) 

275 ) 

276 counters["signup_account_filled_counter"].inc.assert_called_once() 

277 

278 # accept the guidelines twice; the counter must still only fire once 

279 with auth_api_session() as (auth_api, metadata_interceptor): 

280 auth_api.SignupFlow( 

281 auth_pb2.SignupFlowReq( 

282 flow_token=flow_token, 

283 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

284 ) 

285 ) 

286 with auth_api_session() as (auth_api, metadata_interceptor): 

287 auth_api.SignupFlow( 

288 auth_pb2.SignupFlowReq( 

289 flow_token=flow_token, 

290 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

291 ) 

292 ) 

293 counters["signup_guidelines_accepted_counter"].inc.assert_called_once() 

294 

295 with auth_api_session() as (auth_api, metadata_interceptor): 

296 auth_api.SignupFlow( 

297 auth_pb2.SignupFlowReq( 

298 flow_token=flow_token, 

299 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

300 ) 

301 ) 

302 counters["signup_motivations_filled_counter"].inc.assert_called_once() 

303 

304 with auth_api_session() as (auth_api, metadata_interceptor): 

305 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token, email_token=email_token)) 

306 counters["signup_email_verified_counter"].inc.assert_called_once() 

307 

308 assert res.HasField("auth_res") 

309 counters["signup_completions_counter"].labels.assert_called_once_with("Bot") 

310 

311 

312def _quick_signup() -> int: 

313 with auth_api_session() as (auth_api, metadata_interceptor): 

314 res = auth_api.SignupFlow( 

315 auth_pb2.SignupFlowReq( 

316 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

317 account=auth_pb2.SignupAccount( 

318 username="frodo", 

319 password="a very insecure password", 

320 birthdate="1970-01-01", 

321 gender="Bot", 

322 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

323 city="New York City", 

324 lat=40.7331, 

325 lng=-73.9778, 

326 radius=500, 

327 accept_tos=True, 

328 ), 

329 feedback=auth_pb2.ContributorForm(), 

330 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

331 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

332 ) 

333 ) 

334 

335 flow_token = res.flow_token 

336 

337 assert res.flow_token 

338 assert not res.HasField("auth_res") 

339 assert not res.need_basic 

340 assert not res.need_account 

341 assert not res.need_feedback 

342 assert not res.need_motivations 

343 assert res.need_verify_email 

344 

345 # read out the signup token directly from the database for now 

346 with session_scope() as session: 

347 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

348 assert flow.email_sent 

349 assert not flow.email_verified 

350 email_token = flow.email_token 

351 

352 with auth_api_session() as (auth_api, metadata_interceptor): 

353 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

354 

355 assert not res.flow_token 

356 assert res.HasField("auth_res") 

357 assert res.auth_res.user_id 

358 assert not res.auth_res.jailed 

359 assert not res.need_basic 

360 assert not res.need_account 

361 assert not res.need_feedback 

362 assert not res.need_motivations 

363 assert not res.need_verify_email 

364 

365 # make sure we got the right token in a cookie 

366 with session_scope() as session: 

367 token = session.execute( 

368 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

369 ).scalar_one() 

370 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

371 assert sesh == token 

372 

373 return cast(int, res.auth_res.user_id) 

374 

375 

376def test_signup(db): 

377 _quick_signup() 

378 

379 

380def test_basic_login(db): 

381 # Create our test user using signup 

382 _quick_signup() 

383 

384 with auth_api_session() as (auth_api, metadata_interceptor): 

385 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

386 

387 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

388 

389 with session_scope() as session: 

390 token = session.execute( 

391 select(UserSession.token) 

392 .join(User, UserSession.user_id == User.id) 

393 .where(User.username == "frodo") 

394 .where(UserSession.token == reply_token) 

395 .where(UserSession.is_valid) 

396 ).scalar_one_or_none() 

397 assert token 

398 

399 # log out 

400 with auth_api_session() as (auth_api, metadata_interceptor): 

401 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

402 

403 

404def test_login_part_signed_up_verified_email(db): 

405 """ 

406 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

407 """ 

408 with auth_api_session() as (auth_api, metadata_interceptor): 

409 res = auth_api.SignupFlow( 

410 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

411 ) 

412 

413 flow_token = res.flow_token 

414 assert res.need_verify_email 

415 

416 # verify the email 

417 with session_scope() as session: 

418 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

419 flow_token = flow.flow_token 

420 email_token = flow.email_token 

421 with auth_api_session() as (auth_api, metadata_interceptor): 

422 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

423 

424 with EmailCollector() as email_collector: 

425 with auth_api_session() as (auth_api, metadata_interceptor): 

426 with pytest.raises(grpc.RpcError) as err: 

427 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

428 assert err.value.details() == "Please check your email for a link to continue signing up." 

429 

430 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True) 

431 assert email.recipient == "email@couchers.org.invalid" 

432 assert flow_token in email.plain 

433 assert flow_token in email.html 

434 

435 

436def test_login_part_signed_up_not_verified_email(db): 

437 with auth_api_session() as (auth_api, metadata_interceptor): 

438 res = auth_api.SignupFlow( 

439 auth_pb2.SignupFlowReq( 

440 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

441 account=auth_pb2.SignupAccount( 

442 username="frodo", 

443 password="a very insecure password", 

444 birthdate="1999-01-01", 

445 gender="Bot", 

446 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

447 city="New York City", 

448 lat=40.7331, 

449 lng=-73.9778, 

450 radius=500, 

451 accept_tos=True, 

452 ), 

453 ) 

454 ) 

455 

456 flow_token = res.flow_token 

457 assert res.need_verify_email 

458 

459 with EmailCollector() as email_collector: 

460 with auth_api_session() as (auth_api, metadata_interceptor): 

461 with pytest.raises(grpc.RpcError) as err: 

462 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

463 assert err.value.details() == "Please check your email for a link to continue signing up." 

464 

465 with session_scope() as session: 

466 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

467 email_token = flow.email_token 

468 

469 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True) 

470 assert email.recipient == "email@couchers.org.invalid" 

471 assert email_token 

472 assert email_token in email.plain 

473 assert email_token in email.html 

474 

475 

476def test_banned_user(db): 

477 user_id = _quick_signup() 

478 

479 with session_scope() as session: 

480 session.execute(select(User)).scalar_one().banned_at = now() 

481 

482 with auth_api_session() as (auth_api, metadata_interceptor): 

483 with pytest.raises(grpc.RpcError) as e: 

484 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

485 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

486 assert e.value.details() == "Your account is suspended." 

487 

488 with session_scope() as session: 

489 access = session.execute(select(NonvisibleUserAccess)).scalar_one() 

490 assert access.access_type == NonvisibleUserAccessType.login_attempt 

491 assert access.target_state == NonvisibleUserState.banned 

492 assert access.target_user_id == user_id 

493 assert access.actor_user_id == user_id 

494 

495 

496def test_shadowed_user_login_logged(db): 

497 user_id = _quick_signup() 

498 

499 with session_scope() as session: 

500 session.execute(select(User)).scalar_one().shadowed_at = now() 

501 

502 with auth_api_session() as (auth_api, metadata_interceptor): 

503 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

504 

505 with session_scope() as session: 

506 access = session.execute(select(NonvisibleUserAccess)).scalar_one() 

507 assert access.access_type == NonvisibleUserAccessType.login_attempt 

508 assert access.target_state == NonvisibleUserState.shadowed 

509 assert access.target_user_id == user_id 

510 assert access.actor_user_id == user_id 

511 

512 

513def test_deleted_user(db): 

514 user_id = _quick_signup() 

515 

516 with session_scope() as session: 

517 session.execute(update(User).where(User.id == user_id).values(deleted_at=func.now())) 

518 

519 with auth_api_session() as (auth_api, metadata_interceptor): 

520 with pytest.raises(grpc.RpcError) as e: 

521 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

522 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

523 assert e.value.details() == "An account with that username or email was not found." 

524 

525 

526def test_invalid_token(db): 

527 user1, token1 = generate_user() 

528 user2, token2 = generate_user() 

529 

530 wrong_token = random_hex(32) 

531 

532 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

533 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

534 

535 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

536 assert e.value.details() == "Unauthorized" 

537 

538 

539def test_password_reset_v2(db, email_collector: EmailCollector, push_collector: PushCollector): 

540 user, token = generate_user(hashed_password=hash_password("mypassword")) 

541 

542 with auth_api_session() as (auth_api, metadata_interceptor): 

543 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

544 

545 with session_scope() as session: 

546 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one() 

547 

548 email = email_collector.pop_for_recipient(user.email, last=True) 

549 assert email.recipient == user.email 

550 assert "reset" in email.subject.lower() 

551 assert password_reset_token in email.plain 

552 assert password_reset_token in email.html 

553 unique_string = "You asked for your password to be reset on Couchers.org." 

554 assert unique_string in email.plain 

555 assert unique_string in email.html 

556 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.plain 

557 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in email.html 

558 assert "support@couchers.org" in email.plain 

559 assert "support@couchers.org" in email.html 

560 

561 push = push_collector.pop_for_user(user.id, last=True) 

562 assert push.content.title == "Password reset requested" 

563 assert push.content.body == "Use the link we sent by email to complete it." 

564 

565 # make sure bad password are caught 

566 with auth_api_session() as (auth_api, metadata_interceptor): 

567 with pytest.raises(grpc.RpcError) as err: 

568 auth_api.CompletePasswordResetV2( 

569 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

570 ) 

571 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

572 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable." 

573 

574 # make sure we can set a good password 

575 with auth_api_session() as (auth_api, metadata_interceptor): 

576 pwd = random_hex() 

577 auth_api.CompletePasswordResetV2( 

578 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

579 ) 

580 

581 push = push_collector.pop_for_user(user.id, last=True) 

582 assert push.content.title == "Password reset" 

583 assert push.content.body == "Your password was successfully reset." 

584 

585 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

586 

587 with session_scope() as session: 

588 other_session_token = session.execute( 

589 select(UserSession.token) 

590 .join(User, UserSession.user_id == User.id) 

591 .where(User.username == user.username) 

592 .where(UserSession.token == session_token) 

593 .where(UserSession.is_valid) 

594 ).scalar_one_or_none() 

595 assert other_session_token 

596 

597 # make sure we can't set a password again 

598 with auth_api_session() as (auth_api, metadata_interceptor): 

599 with pytest.raises(grpc.RpcError) as err: 

600 auth_api.CompletePasswordResetV2( 

601 auth_pb2.CompletePasswordResetV2Req( 

602 password_reset_token=password_reset_token, new_password=random_hex() 

603 ) 

604 ) 

605 assert err.value.code() == grpc.StatusCode.NOT_FOUND 

606 assert err.value.details() == "Invalid token." 

607 

608 with session_scope() as session: 

609 user = session.execute(select(User)).scalar_one() 

610 assert user.hashed_password == hash_password(pwd) 

611 

612 

613def test_password_reset_no_such_user(db): 

614 user, token = generate_user() 

615 

616 with auth_api_session() as (auth_api, metadata_interceptor): 

617 res = auth_api.ResetPassword( 

618 auth_pb2.ResetPasswordReq( 

619 user="nonexistentuser", 

620 ) 

621 ) 

622 

623 with session_scope() as session: 

624 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None 

625 

626 

627def test_password_reset_invalid_token_v2(db): 

628 password = random_hex() 

629 user, token = generate_user(hashed_password=hash_password(password)) 

630 

631 with auth_api_session() as (auth_api, metadata_interceptor): 

632 res = auth_api.ResetPassword( 

633 auth_pb2.ResetPasswordReq( 

634 user=user.username, 

635 ) 

636 ) 

637 

638 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

639 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

640 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

641 assert e.value.details() == "Invalid token." 

642 

643 with session_scope() as session: 

644 user = session.execute(select(User)).scalar_one() 

645 assert user.hashed_password == hash_password(password) 

646 

647 

648def test_logout_invalid_token(db): 

649 # Create our test user using signup 

650 _quick_signup() 

651 

652 with auth_api_session() as (auth_api, metadata_interceptor): 

653 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

654 

655 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

656 

657 # delete all login tokens 

658 with session_scope() as session: 

659 session.execute(delete(LoginToken)) 

660 

661 # log out with non-existent token should still return a valid result 

662 with auth_api_session() as (auth_api, metadata_interceptor): 

663 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

664 

665 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

666 # make sure we set an empty cookie 

667 assert reply_token == "" 

668 

669 

670def test_signup_without_password(db): 

671 with auth_api_session() as (auth_api, metadata_interceptor): 

672 with pytest.raises(grpc.RpcError) as e: 

673 auth_api.SignupFlow( 

674 auth_pb2.SignupFlowReq( 

675 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

676 account=auth_pb2.SignupAccount( 

677 username="frodo", 

678 password="bad", 

679 city="Minas Tirith", 

680 birthdate="9999-12-31", # arbitrary future birthdate 

681 gender="Robot", 

682 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

683 lat=1, 

684 lng=1, 

685 radius=100, 

686 accept_tos=True, 

687 ), 

688 feedback=auth_pb2.ContributorForm(), 

689 ) 

690 ) 

691 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

692 assert e.value.details() == "The password must be 8 or more characters long." 

693 

694 

695def test_signup_invalid_birthdate(db): 

696 with auth_api_session() as (auth_api, metadata_interceptor): 

697 with pytest.raises(grpc.RpcError) as e: 

698 auth_api.SignupFlow( 

699 auth_pb2.SignupFlowReq( 

700 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

701 account=auth_pb2.SignupAccount( 

702 username="frodo", 

703 password="a very insecure password", 

704 city="Minas Tirith", 

705 birthdate="9999-12-31", # arbitrary future birthdate 

706 gender="Robot", 

707 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

708 lat=1, 

709 lng=1, 

710 radius=100, 

711 accept_tos=True, 

712 ), 

713 feedback=auth_pb2.ContributorForm(), 

714 ) 

715 ) 

716 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

717 assert e.value.details() == "You must be at least 18 years old to sign up." 

718 

719 res = auth_api.SignupFlow( 

720 auth_pb2.SignupFlowReq( 

721 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

722 account=auth_pb2.SignupAccount( 

723 username="ceelo", 

724 password="a very insecure password", 

725 city="New York City", 

726 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

727 gender="Helicopter", 

728 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

729 lat=1, 

730 lng=1, 

731 radius=100, 

732 accept_tos=True, 

733 ), 

734 feedback=auth_pb2.ContributorForm(), 

735 ) 

736 ) 

737 

738 assert res.flow_token 

739 

740 with pytest.raises(grpc.RpcError) as e: 

741 auth_api.SignupFlow( 

742 auth_pb2.SignupFlowReq( 

743 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

744 account=auth_pb2.SignupAccount( 

745 username="franklin", 

746 password="a very insecure password", 

747 city="Los Santos", 

748 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

749 gender="Male", 

750 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

751 lat=1, 

752 lng=1, 

753 radius=100, 

754 accept_tos=True, 

755 ), 

756 feedback=auth_pb2.ContributorForm(), 

757 ) 

758 ) 

759 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

760 assert e.value.details() == "You must be at least 18 years old to sign up." 

761 

762 with session_scope() as session: 

763 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

764 

765 

766def test_signup_invalid_email(db): 

767 with auth_api_session() as (auth_api, metadata_interceptor): 

768 with pytest.raises(grpc.RpcError) as e: 

769 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

770 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

771 assert e.value.details() == "Invalid email." 

772 

773 with auth_api_session() as (auth_api, metadata_interceptor): 

774 with pytest.raises(grpc.RpcError) as e: 

775 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

776 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

777 assert e.value.details() == "Invalid email." 

778 

779 with auth_api_session() as (auth_api, metadata_interceptor): 

780 with pytest.raises(grpc.RpcError) as e: 

781 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

782 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

783 assert e.value.details() == "Invalid email." 

784 

785 with auth_api_session() as (auth_api, metadata_interceptor): 

786 with pytest.raises(grpc.RpcError) as e: 

787 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

788 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

789 assert e.value.details() == "Invalid email." 

790 

791 

792def test_signup_existing_email(db): 

793 # Signed up user 

794 user, _ = generate_user() 

795 

796 with auth_api_session() as (auth_api, metadata_interceptor): 

797 with pytest.raises(grpc.RpcError) as e: 

798 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))) 

799 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

800 assert e.value.details() == "That email address is already associated with an account. Please log in instead!" 

801 

802 

803def test_signup_banned_user_email(db): 

804 user, _ = generate_user() 

805 

806 with session_scope() as session: 

807 session.execute(update(User).where(User.id == user.id).values(banned_at=func.now())) 

808 

809 with auth_api_session() as (auth_api, _): 

810 with pytest.raises(grpc.RpcError) as e: 

811 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

812 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

813 assert e.value.details() == "You cannot sign up with that email address." 

814 

815 

816def test_signup_deleted_user_email(db): 

817 user, _ = generate_user() 

818 

819 with session_scope() as session: 

820 session.execute(update(User).where(User.id == user.id).values(deleted_at=func.now())) 

821 

822 with auth_api_session() as (auth_api, _): 

823 with pytest.raises(grpc.RpcError) as e: 

824 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

825 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

826 assert e.value.details() == "You cannot sign up with that email address." 

827 

828 

829def test_signup_continue_with_email(db): 

830 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

831 with auth_api_session() as (auth_api, metadata_interceptor): 

832 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

833 flow_token = res.flow_token 

834 assert flow_token 

835 

836 # continue with same email, should just send another email to the user 

837 with auth_api_session() as (auth_api, metadata_interceptor): 

838 with pytest.raises(grpc.RpcError) as e: 

839 res = auth_api.SignupFlow( 

840 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

841 ) 

842 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

843 assert e.value.details() == "Please check your email for a link to continue signing up." 

844 

845 

846def test_signup_resend_email(db, email_collector: EmailCollector): 

847 with auth_api_session() as (auth_api, metadata_interceptor): 

848 res = auth_api.SignupFlow( 

849 auth_pb2.SignupFlowReq( 

850 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

851 account=auth_pb2.SignupAccount( 

852 username="frodo", 

853 password="a very insecure password", 

854 birthdate="1970-01-01", 

855 gender="Bot", 

856 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

857 city="New York City", 

858 lat=40.7331, 

859 lng=-73.9778, 

860 radius=500, 

861 accept_tos=True, 

862 ), 

863 feedback=auth_pb2.ContributorForm(), 

864 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

865 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

866 ) 

867 ) 

868 

869 email_collector.pop_for_recipient("email@couchers.org.invalid", last=True) 

870 

871 flow_token = res.flow_token 

872 assert flow_token 

873 

874 with session_scope() as session: 

875 flow = session.execute(select(SignupFlow)).scalar_one() 

876 assert flow.flow_token == flow_token 

877 assert flow.email_sent 

878 assert not flow.email_verified 

879 email_token = flow.email_token 

880 

881 # ask for a new signup email 

882 with auth_api_session() as (auth_api, metadata_interceptor): 

883 res = auth_api.SignupFlow( 

884 auth_pb2.SignupFlowReq( 

885 flow_token=flow_token, 

886 resend_verification_email=True, 

887 ) 

888 ) 

889 

890 email = email_collector.pop_for_recipient("email@couchers.org.invalid", last=True) 

891 assert email_token 

892 assert email_token in email.plain 

893 assert email_token in email.html 

894 

895 with session_scope() as session: 

896 flow = session.execute(select(SignupFlow)).scalar_one() 

897 assert not flow.email_verified 

898 

899 with auth_api_session() as (auth_api, metadata_interceptor): 

900 res = auth_api.SignupFlow( 

901 auth_pb2.SignupFlowReq( 

902 email_token=email_token, 

903 ) 

904 ) 

905 

906 assert not res.flow_token 

907 assert res.HasField("auth_res") 

908 

909 

910def test_successful_authenticate(db): 

911 user, _ = generate_user(hashed_password=hash_password("password")) 

912 

913 # Authenticate with username 

914 with auth_api_session() as (auth_api, metadata_interceptor): 

915 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

916 assert not reply.jailed 

917 

918 # Authenticate with email 

919 with auth_api_session() as (auth_api, metadata_interceptor): 

920 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

921 assert not reply.jailed 

922 

923 

924def test_unsuccessful_authenticate(db): 

925 user, _ = generate_user(hashed_password=hash_password("password")) 

926 

927 # Invalid password 

928 with auth_api_session() as (auth_api, metadata_interceptor): 

929 with pytest.raises(grpc.RpcError) as e: 

930 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

931 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

932 assert e.value.details() == "Wrong username/email or password." 

933 

934 # Invalid username 

935 with auth_api_session() as (auth_api, metadata_interceptor): 

936 with pytest.raises(grpc.RpcError) as e: 

937 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

938 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

939 assert e.value.details() == "An account with that username or email was not found." 

940 

941 # Invalid email 

942 with auth_api_session() as (auth_api, metadata_interceptor): 

943 with pytest.raises(grpc.RpcError) as e: 

944 reply = auth_api.Authenticate( 

945 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

946 ) 

947 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

948 assert e.value.details() == "An account with that username or email was not found." 

949 

950 # Invalid id 

951 with auth_api_session() as (auth_api, metadata_interceptor): 

952 with pytest.raises(grpc.RpcError) as e: 

953 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

954 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

955 assert e.value.details() == "An account with that username or email was not found." 

956 

957 

958def test_complete_signup(db): 

959 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

960 with auth_api_session() as (auth_api, metadata_interceptor): 

961 reply = auth_api.SignupFlow( 

962 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

963 ) 

964 

965 flow_token = reply.flow_token 

966 

967 with auth_api_session() as (auth_api, metadata_interceptor): 

968 # Invalid username 

969 with pytest.raises(grpc.RpcError) as e: 

970 auth_api.SignupFlow( 

971 auth_pb2.SignupFlowReq( 

972 flow_token=flow_token, 

973 account=auth_pb2.SignupAccount( 

974 username=" ", 

975 password="a very insecure password", 

976 city="Minas Tirith", 

977 birthdate="1980-12-31", 

978 gender="Robot", 

979 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

980 lat=1, 

981 lng=1, 

982 radius=100, 

983 accept_tos=True, 

984 ), 

985 ) 

986 ) 

987 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

988 assert e.value.details() == "Invalid username." 

989 

990 with auth_api_session() as (auth_api, metadata_interceptor): 

991 # Invalid name 

992 with pytest.raises(grpc.RpcError) as e: 

993 auth_api.SignupFlow( 

994 auth_pb2.SignupFlowReq( 

995 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

996 ) 

997 ) 

998 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

999 assert e.value.details() == "Name not supported." 

1000 

1001 with auth_api_session() as (auth_api, metadata_interceptor): 

1002 # Hosting status required 

1003 with pytest.raises(grpc.RpcError) as e: 

1004 auth_api.SignupFlow( 

1005 auth_pb2.SignupFlowReq( 

1006 flow_token=flow_token, 

1007 account=auth_pb2.SignupAccount( 

1008 username="frodo", 

1009 password="a very insecure password", 

1010 city="Minas Tirith", 

1011 birthdate="1980-12-31", 

1012 gender="Robot", 

1013 hosting_status=None, 

1014 lat=1, 

1015 lng=1, 

1016 radius=100, 

1017 accept_tos=True, 

1018 ), 

1019 ) 

1020 ) 

1021 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1022 assert e.value.details() == "Hosting status is required." 

1023 

1024 user, _ = generate_user() 

1025 with auth_api_session() as (auth_api, metadata_interceptor): 

1026 # Username unavailable 

1027 with pytest.raises(grpc.RpcError) as e: 

1028 auth_api.SignupFlow( 

1029 auth_pb2.SignupFlowReq( 

1030 flow_token=flow_token, 

1031 account=auth_pb2.SignupAccount( 

1032 username=user.username, 

1033 password="a very insecure password", 

1034 city="Minas Tirith", 

1035 birthdate="1980-12-31", 

1036 gender="Robot", 

1037 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1038 lat=1, 

1039 lng=1, 

1040 radius=100, 

1041 accept_tos=True, 

1042 ), 

1043 ) 

1044 ) 

1045 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1046 assert e.value.details() == "Sorry, that username isn't available." 

1047 

1048 with auth_api_session() as (auth_api, metadata_interceptor): 

1049 # Invalid coordinate 

1050 with pytest.raises(grpc.RpcError) as e: 

1051 auth_api.SignupFlow( 

1052 auth_pb2.SignupFlowReq( 

1053 flow_token=flow_token, 

1054 account=auth_pb2.SignupAccount( 

1055 username="frodo", 

1056 password="a very insecure password", 

1057 city="Minas Tirith", 

1058 birthdate="1980-12-31", 

1059 gender="Robot", 

1060 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1061 lat=0, 

1062 lng=0, 

1063 radius=100, 

1064 accept_tos=True, 

1065 ), 

1066 ) 

1067 ) 

1068 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

1069 assert e.value.details() == "Invalid coordinate." 

1070 

1071 

1072def test_signup_token_regression(db): 

1073 # Repro steps: 

1074 # 1. Start a signup 

1075 # 2. Confirm the email 

1076 # 3. Start a new signup with the same email 

1077 # Expected: send a link to the email to continue signing up. 

1078 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

1079 

1080 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1081 

1082 # 1. Start a signup 

1083 with auth_api_session() as (auth_api, metadata_interceptor): 

1084 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

1085 flow_token = res.flow_token 

1086 assert flow_token 

1087 

1088 # 2. Confirm the email 

1089 with session_scope() as session: 

1090 email_token = session.execute( 

1091 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

1092 ).scalar_one() 

1093 

1094 with auth_api_session() as (auth_api, metadata_interceptor): 

1095 auth_api.SignupFlow( 

1096 auth_pb2.SignupFlowReq( 

1097 flow_token=flow_token, 

1098 email_token=email_token, 

1099 ) 

1100 ) 

1101 

1102 # 3. Start a new signup with the same email 

1103 with auth_api_session() as (auth_api, metadata_interceptor): 

1104 with pytest.raises(grpc.RpcError) as e: 

1105 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

1106 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1107 assert e.value.details() == "Please check your email for a link to continue signing up." 

1108 

1109 

1110@pytest.mark.parametrize("opt_out", [True, False]) 

1111def test_opt_out_of_newsletter(db, opt_out): 

1112 with auth_api_session() as (auth_api, metadata_interceptor): 

1113 res = auth_api.SignupFlow( 

1114 auth_pb2.SignupFlowReq( 

1115 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1116 account=auth_pb2.SignupAccount( 

1117 username="frodo", 

1118 password="a very insecure password", 

1119 birthdate="1970-01-01", 

1120 gender="Bot", 

1121 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1122 city="New York City", 

1123 lat=40.7331, 

1124 lng=-73.9778, 

1125 radius=500, 

1126 accept_tos=True, 

1127 opt_out_of_newsletter=opt_out, 

1128 ), 

1129 feedback=auth_pb2.ContributorForm(), 

1130 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1131 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1132 ) 

1133 ) 

1134 

1135 with session_scope() as session: 

1136 email_token = session.execute( 

1137 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token) 

1138 ).scalar_one() 

1139 

1140 with auth_api_session() as (auth_api, metadata_interceptor): 

1141 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1142 

1143 user_id = res.auth_res.user_id 

1144 

1145 with session_scope() as session: 

1146 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1147 assert not user.in_sync_with_newsletter 

1148 assert user.opt_out_of_newsletter == opt_out 

1149 

1150 

1151def test_GetAuthState(db): 

1152 user, token = generate_user() 

1153 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1154 

1155 with auth_api_session() as (auth_api, metadata_interceptor): 

1156 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1157 assert not res.logged_in 

1158 assert not res.HasField("auth_res") 

1159 

1160 with auth_api_session() as (auth_api, metadata_interceptor): 

1161 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1162 assert res.logged_in 

1163 assert res.HasField("auth_res") 

1164 assert res.auth_res.user_id == user.id 

1165 assert not res.auth_res.jailed 

1166 

1167 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1168 

1169 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1170 assert not res.logged_in 

1171 assert not res.HasField("auth_res") 

1172 

1173 with auth_api_session() as (auth_api, metadata_interceptor): 

1174 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1175 assert res.logged_in 

1176 assert res.HasField("auth_res") 

1177 assert res.auth_res.user_id == jailed_user.id 

1178 assert res.auth_res.jailed 

1179 

1180 

1181def test_signup_no_feedback_regression(db): 

1182 """ 

1183 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1184 this regression test checks that. 

1185 """ 

1186 with auth_api_session() as (auth_api, metadata_interceptor): 

1187 res = auth_api.SignupFlow( 

1188 auth_pb2.SignupFlowReq( 

1189 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1190 account=auth_pb2.SignupAccount( 

1191 username="frodo", 

1192 password="a very insecure password", 

1193 birthdate="1970-01-01", 

1194 gender="Bot", 

1195 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1196 city="New York City", 

1197 lat=40.7331, 

1198 lng=-73.9778, 

1199 radius=500, 

1200 accept_tos=True, 

1201 ), 

1202 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1203 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1204 ) 

1205 ) 

1206 

1207 flow_token = res.flow_token 

1208 

1209 assert res.flow_token 

1210 assert not res.HasField("auth_res") 

1211 assert not res.need_basic 

1212 assert not res.need_account 

1213 assert not res.need_feedback 

1214 assert not res.need_motivations 

1215 assert res.need_verify_email 

1216 

1217 # read out the signup token directly from the database for now 

1218 with session_scope() as session: 

1219 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1220 assert flow.email_sent 

1221 assert not flow.email_verified 

1222 email_token = flow.email_token 

1223 

1224 with auth_api_session() as (auth_api, metadata_interceptor): 

1225 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1226 

1227 assert not res.flow_token 

1228 assert res.HasField("auth_res") 

1229 assert res.auth_res.user_id 

1230 assert not res.auth_res.jailed 

1231 assert not res.need_basic 

1232 assert not res.need_account 

1233 assert not res.need_feedback 

1234 assert not res.need_motivations 

1235 assert not res.need_verify_email 

1236 

1237 # make sure we got the right token in a cookie 

1238 with session_scope() as session: 

1239 token = session.execute( 

1240 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1241 ).scalar_one() 

1242 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1243 assert sesh == token 

1244 

1245 

1246def test_banned_username(db): 

1247 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1248 with auth_api_session() as (auth_api, metadata_interceptor): 

1249 reply = auth_api.SignupFlow( 

1250 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1251 ) 

1252 

1253 flow_token = reply.flow_token 

1254 

1255 with auth_api_session() as (auth_api, metadata_interceptor): 

1256 # Banned username 

1257 with pytest.raises(grpc.RpcError) as e: 

1258 auth_api.SignupFlow( 

1259 auth_pb2.SignupFlowReq( 

1260 flow_token=flow_token, 

1261 account=auth_pb2.SignupAccount( 

1262 username="thecouchersadminaccount", 

1263 password="a very insecure password", 

1264 city="Minas Tirith", 

1265 birthdate="1980-12-31", 

1266 gender="Robot", 

1267 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1268 lat=1, 

1269 lng=1, 

1270 radius=100, 

1271 accept_tos=True, 

1272 ), 

1273 ) 

1274 ) 

1275 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1276 assert e.value.details() == "Sorry, that username isn't available." 

1277 

1278 

1279# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_* 

1280 

1281 

1282def test_GetInviteCodeInfo(db): 

1283 user, token = generate_user(complete_profile=True) 

1284 

1285 with account_session(token) as account: 

1286 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1287 

1288 with auth_api_session() as (auth, _): 

1289 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1290 assert res.name == user.name 

1291 assert res.username == user.username 

1292 # Avatar URL should be a thumbnail URL with a hashed filename 

1293 assert "/img/thumbnail/" in res.avatar_url 

1294 assert res.avatar_url.endswith(".jpg") 

1295 # Verify the hashed filename looks correct (64 char hex hash) 

1296 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64 

1297 assert res.url == urls.invite_code_link(code=code) 

1298 

1299 

1300def test_GetInviteCodeInfo_no_avatar(db): 

1301 user, token = generate_user(complete_profile=False) 

1302 

1303 with account_session(token) as account: 

1304 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1305 

1306 with auth_api_session() as (auth, _): 

1307 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1308 assert res.name == user.name 

1309 assert res.username == user.username 

1310 assert res.avatar_url == "" 

1311 assert res.url == urls.invite_code_link(code=code) 

1312 

1313 

1314def test_GetInviteCodeInfo_not_found(db): 

1315 generate_user() 

1316 

1317 with auth_api_session() as (auth, _): 

1318 with pytest.raises(grpc.RpcError) as e: 

1319 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE")) 

1320 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1321 assert e.value.details() == "Invite code not found." 

1322 

1323 

1324def test_SignupFlow_invite_code(db): 

1325 user, token = generate_user() 

1326 

1327 with account_session(token) as account: 

1328 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1329 

1330 with auth_api_session() as (auth_api, _): 

1331 # Signup basic step with invite code 

1332 res = auth_api.SignupFlow( 

1333 auth_pb2.SignupFlowReq( 

1334 basic=auth_pb2.SignupBasic( 

1335 name="Test User", 

1336 email="inviteuser@example.com", 

1337 invite_code=invite_code, 

1338 ) 

1339 ) 

1340 ) 

1341 flow_token = res.flow_token 

1342 assert flow_token 

1343 

1344 # Confirm email 

1345 with session_scope() as session: 

1346 email_token = session.execute( 

1347 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

1348 ).scalar_one() 

1349 

1350 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1351 

1352 # Signup account step 

1353 auth_api.SignupFlow( 

1354 auth_pb2.SignupFlowReq( 

1355 flow_token=flow_token, 

1356 account=auth_pb2.SignupAccount( 

1357 username="invited_user", 

1358 password="secure password", 

1359 birthdate="1990-01-01", 

1360 gender="Other", 

1361 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1362 city="Example City", 

1363 lat=1, 

1364 lng=5, 

1365 radius=100, 

1366 accept_tos=True, 

1367 ), 

1368 feedback=auth_pb2.ContributorForm(), 

1369 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1370 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1371 ) 

1372 ) 

1373 

1374 # Check that invite_code_id is stored in the final User object 

1375 with session_scope() as session: 

1376 invite_code_id = session.execute( 

1377 select(User.invite_code_id).where(User.username == "invited_user") 

1378 ).scalar_one() 

1379 assert invite_code_id == invite_code 

1380 

1381 

1382def test_signup_with_motivations(db): 

1383 """ 

1384 Test signup flow with the new motivations step (heard_about_couchers and signup_motivations) 

1385 """ 

1386 with auth_api_session() as (auth_api, metadata_interceptor): 

1387 res = auth_api.SignupFlow( 

1388 auth_pb2.SignupFlowReq( 

1389 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1390 account=auth_pb2.SignupAccount( 

1391 username="intentuser", 

1392 password="a very insecure password", 

1393 birthdate="1970-01-01", 

1394 gender="Bot", 

1395 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1396 city="New York City", 

1397 lat=40.7331, 

1398 lng=-73.9778, 

1399 radius=500, 

1400 accept_tos=True, 

1401 ), 

1402 motivations=auth_pb2.SignupMotivations( 

1403 heard_about_couchers="friend", 

1404 motivations=["hosting", "surfing", "events"], 

1405 ), 

1406 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1407 ) 

1408 ) 

1409 

1410 flow_token = res.flow_token 

1411 assert flow_token 

1412 assert not res.HasField("auth_res") 

1413 assert res.need_verify_email 

1414 

1415 # Verify the motivations are stored in the SignupFlow 

1416 with session_scope() as session: 

1417 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1418 assert flow.heard_about_couchers == "friend" 

1419 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"} 

1420 email_token = flow.email_token 

1421 

1422 # Complete signup by verifying email 

1423 with auth_api_session() as (auth_api, metadata_interceptor): 

1424 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1425 

1426 assert res.HasField("auth_res") 

1427 user_id = res.auth_res.user_id 

1428 

1429 # Verify the motivations are transferred to the User object 

1430 with session_scope() as session: 

1431 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1432 assert user.heard_about_couchers == "friend" 

1433 assert user.signup_motivations is not None 

1434 assert set(user.signup_motivations) == {"hosting", "surfing", "events"} 

1435 

1436 

1437def test_signup_motivations_incremental(db): 

1438 """ 

1439 Test that motivations can be submitted incrementally as a separate step 

1440 """ 

1441 with auth_api_session() as (auth_api, metadata_interceptor): 

1442 # First, basic signup 

1443 res = auth_api.SignupFlow( 

1444 auth_pb2.SignupFlowReq( 

1445 basic=auth_pb2.SignupBasic(name="testing", email="email2@couchers.org.invalid"), 

1446 ) 

1447 ) 

1448 

1449 flow_token = res.flow_token 

1450 assert flow_token 

1451 assert res.need_account 

1452 assert res.need_motivations # New field 

1453 

1454 # Submit motivations separately 

1455 with auth_api_session() as (auth_api, metadata_interceptor): 

1456 res = auth_api.SignupFlow( 

1457 auth_pb2.SignupFlowReq( 

1458 flow_token=flow_token, 

1459 motivations=auth_pb2.SignupMotivations( 

1460 heard_about_couchers="social_media", 

1461 motivations=["surfing"], 

1462 ), 

1463 ) 

1464 ) 

1465 

1466 assert res.flow_token == flow_token 

1467 assert not res.need_motivations # Should be filled now 

1468 assert res.need_account # Still need account 

1469 

1470 # Verify motivations are stored 

1471 with session_scope() as session: 

1472 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1473 assert flow.heard_about_couchers == "social_media" 

1474 assert flow.signup_motivations == ["surfing"] 

1475 

1476 

1477def test_signup_motivations_cannot_be_refilled(db): 

1478 """ 

1479 Test that motivations cannot be submitted twice 

1480 """ 

1481 with auth_api_session() as (auth_api, metadata_interceptor): 

1482 res = auth_api.SignupFlow( 

1483 auth_pb2.SignupFlowReq( 

1484 basic=auth_pb2.SignupBasic(name="testing", email="email3@couchers.org.invalid"), 

1485 motivations=auth_pb2.SignupMotivations( 

1486 heard_about_couchers="friend", 

1487 motivations=["hosting"], 

1488 ), 

1489 ) 

1490 ) 

1491 

1492 flow_token = res.flow_token 

1493 

1494 # Try to submit motivations again - should fail 

1495 with auth_api_session() as (auth_api, metadata_interceptor): 

1496 with pytest.raises(grpc.RpcError) as e: 

1497 auth_api.SignupFlow( 

1498 auth_pb2.SignupFlowReq( 

1499 flow_token=flow_token, 

1500 motivations=auth_pb2.SignupMotivations( 

1501 heard_about_couchers="different_source", 

1502 motivations=["surfing"], 

1503 ), 

1504 ) 

1505 ) 

1506 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1507 assert e.value.details() == "You've already told us about why you are signing up." 

1508 

1509 

1510def test_signup_motivations_required(db): 

1511 """ 

1512 Test that signup cannot complete without providing motivations 

1513 """ 

1514 with auth_api_session() as (auth_api, metadata_interceptor): 

1515 res = auth_api.SignupFlow( 

1516 auth_pb2.SignupFlowReq( 

1517 basic=auth_pb2.SignupBasic(name="testing", email="email4@couchers.org.invalid"), 

1518 account=auth_pb2.SignupAccount( 

1519 username="nointents", 

1520 password="a very insecure password", 

1521 birthdate="1970-01-01", 

1522 gender="Bot", 

1523 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1524 city="New York City", 

1525 lat=40.7331, 

1526 lng=-73.9778, 

1527 radius=500, 

1528 accept_tos=True, 

1529 ), 

1530 # No motivations provided 

1531 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1532 ) 

1533 ) 

1534 

1535 flow_token = res.flow_token 

1536 assert not res.HasField("auth_res") 

1537 assert res.need_motivations # Intents still required 

1538 

1539 with session_scope() as session: 

1540 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1541 email_token = flow.email_token 

1542 

1543 # Verify email - signup still not complete without motivations 

1544 with auth_api_session() as (auth_api, metadata_interceptor): 

1545 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1546 

1547 assert not res.HasField("auth_res") 

1548 assert res.need_motivations 

1549 

1550 # Now submit motivations 

1551 with auth_api_session() as (auth_api, metadata_interceptor): 

1552 res = auth_api.SignupFlow( 

1553 auth_pb2.SignupFlowReq( 

1554 flow_token=flow_token, 

1555 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1556 ) 

1557 ) 

1558 

1559 assert res.HasField("auth_res") 

1560 user_id = res.auth_res.user_id 

1561 

1562 with session_scope() as session: 

1563 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1564 assert user.signup_motivations == ["surfing"] 

1565 

1566 

1567def test_signup_motivations_all_options(db): 

1568 """ 

1569 Test all the different motivation options 

1570 """ 

1571 with auth_api_session() as (auth_api, metadata_interceptor): 

1572 res = auth_api.SignupFlow( 

1573 auth_pb2.SignupFlowReq( 

1574 basic=auth_pb2.SignupBasic(name="testing", email="email5@couchers.org.invalid"), 

1575 motivations=auth_pb2.SignupMotivations( 

1576 heard_about_couchers="other", 

1577 motivations=["hosting", "surfing", "events"], 

1578 ), 

1579 ) 

1580 ) 

1581 

1582 flow_token = res.flow_token 

1583 

1584 with session_scope() as session: 

1585 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1586 assert flow.heard_about_couchers == "other" 

1587 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"} 

1588 

1589 

1590def test_signup_motivations_empty_motivations_list(db): 

1591 """ 

1592 Test that providing heard_about but empty motivations list is valid 

1593 """ 

1594 with auth_api_session() as (auth_api, metadata_interceptor): 

1595 res = auth_api.SignupFlow( 

1596 auth_pb2.SignupFlowReq( 

1597 basic=auth_pb2.SignupBasic(name="testing", email="email6@couchers.org.invalid"), 

1598 motivations=auth_pb2.SignupMotivations( 

1599 heard_about_couchers="former_cs_member", 

1600 motivations=[], # No specific motivations selected 

1601 ), 

1602 ) 

1603 ) 

1604 

1605 flow_token = res.flow_token 

1606 

1607 with session_scope() as session: 

1608 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1609 assert flow.heard_about_couchers == "former_cs_member" 

1610 assert flow.signup_motivations == []