Coverage for src/tests/test_auth.py: 100%

527 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-12 16:45 +0000

1import http.cookies 

2 

3import grpc 

4import pytest 

5from google.protobuf import empty_pb2, wrappers_pb2 

6from sqlalchemy.sql import delete, func 

7 

8from couchers import errors 

9from couchers.crypto import hash_password, random_hex 

10from couchers.db import session_scope 

11from couchers.models import ( 

12 ContributeOption, 

13 ContributorForm, 

14 LoginToken, 

15 PasswordResetToken, 

16 SignupFlow, 

17 User, 

18 UserSession, 

19) 

20from couchers.sql import couchers_select as select 

21from proto import api_pb2, auth_pb2 

22from tests.test_fixtures import ( # noqa 

23 api_session, 

24 auth_api_session, 

25 db, 

26 email_fields, 

27 fast_passwords, 

28 generate_user, 

29 mock_notification_email, 

30 push_collector, 

31 real_api_session, 

32 testconfig, 

33) 

34 

35 

36@pytest.fixture(autouse=True) 

37def _(testconfig, fast_passwords): 

38 pass 

39 

40 

41def get_session_cookie_tokens(metadata_interceptor): 

42 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

43 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

44 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

45 return sesh, uid 

46 

47 

48def test_UsernameValid(db): 

49 with auth_api_session() as (auth_api, metadata_interceptor): 

50 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

51 

52 with auth_api_session() as (auth_api, metadata_interceptor): 

53 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

54 

55 

56def test_signup_incremental(db): 

57 with auth_api_session() as (auth_api, metadata_interceptor): 

58 res = auth_api.SignupFlow( 

59 auth_pb2.SignupFlowReq( 

60 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

61 ) 

62 ) 

63 

64 flow_token = res.flow_token 

65 assert res.flow_token 

66 assert not res.HasField("auth_res") 

67 assert not res.need_basic 

68 assert res.need_account 

69 assert not res.need_feedback 

70 assert res.need_verify_email 

71 assert res.need_accept_community_guidelines 

72 

73 # read out the signup token directly from the database for now 

74 with session_scope() as session: 

75 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

76 assert flow.email_sent 

77 assert not flow.email_verified 

78 email_token = flow.email_token 

79 

80 with auth_api_session() as (auth_api, metadata_interceptor): 

81 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

82 

83 assert res.flow_token == flow_token 

84 assert not res.HasField("auth_res") 

85 assert not res.need_basic 

86 assert res.need_account 

87 assert not res.need_feedback 

88 assert res.need_verify_email 

89 assert res.need_accept_community_guidelines 

90 

91 # Add feedback 

92 with auth_api_session() as (auth_api, metadata_interceptor): 

93 res = auth_api.SignupFlow( 

94 auth_pb2.SignupFlowReq( 

95 flow_token=flow_token, 

96 feedback=auth_pb2.ContributorForm( 

97 ideas="I'm a robot, incapable of original ideation", 

98 features="I love all your features", 

99 experience="I haven't done couch surfing before", 

100 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

101 contribute_ways=["serving", "backend"], 

102 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

103 ), 

104 ) 

105 ) 

106 

107 assert res.flow_token == flow_token 

108 assert not res.HasField("auth_res") 

109 assert not res.need_basic 

110 assert res.need_account 

111 assert not res.need_feedback 

112 assert res.need_verify_email 

113 assert res.need_accept_community_guidelines 

114 

115 # Agree to community guidelines 

116 with auth_api_session() as (auth_api, metadata_interceptor): 

117 res = auth_api.SignupFlow( 

118 auth_pb2.SignupFlowReq( 

119 flow_token=flow_token, 

120 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

121 ) 

122 ) 

123 

124 assert res.flow_token == flow_token 

125 assert not res.HasField("auth_res") 

126 assert not res.need_basic 

127 assert res.need_account 

128 assert not res.need_feedback 

129 assert res.need_verify_email 

130 assert not res.need_accept_community_guidelines 

131 

132 # Verify email 

133 with auth_api_session() as (auth_api, metadata_interceptor): 

134 res = auth_api.SignupFlow( 

135 auth_pb2.SignupFlowReq( 

136 flow_token=flow_token, 

137 email_token=email_token, 

138 ) 

139 ) 

140 

141 assert res.flow_token == flow_token 

142 assert not res.HasField("auth_res") 

143 assert not res.need_basic 

144 assert res.need_account 

145 assert not res.need_feedback 

146 assert not res.need_verify_email 

147 assert not res.need_accept_community_guidelines 

148 

149 # Finally finish off account info 

150 with auth_api_session() as (auth_api, metadata_interceptor): 

151 res = auth_api.SignupFlow( 

152 auth_pb2.SignupFlowReq( 

153 flow_token=flow_token, 

154 account=auth_pb2.SignupAccount( 

155 username="frodo", 

156 password="a very insecure password", 

157 birthdate="1970-01-01", 

158 gender="Bot", 

159 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

160 city="New York City", 

161 lat=40.7331, 

162 lng=-73.9778, 

163 radius=500, 

164 accept_tos=True, 

165 ), 

166 ) 

167 ) 

168 

169 assert not res.flow_token 

170 assert res.HasField("auth_res") 

171 assert res.auth_res.user_id 

172 assert not res.auth_res.jailed 

173 assert not res.need_basic 

174 assert not res.need_account 

175 assert not res.need_feedback 

176 assert not res.need_verify_email 

177 assert not res.need_accept_community_guidelines 

178 

179 user_id = res.auth_res.user_id 

180 

181 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

182 assert uid == str(user_id) 

183 

184 with api_session(sess_token) as api: 

185 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

186 

187 assert res.username == "frodo" 

188 assert res.gender == "Bot" 

189 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

190 assert res.city == "New York City" 

191 assert res.lat == 40.7331 

192 assert res.lng == -73.9778 

193 assert res.radius == 500 

194 

195 with session_scope() as session: 

196 form = session.execute(select(ContributorForm)).scalar_one() 

197 

198 assert form.ideas == "I'm a robot, incapable of original ideation" 

199 assert form.features == "I love all your features" 

200 assert form.experience == "I haven't done couch surfing before" 

201 assert form.contribute == ContributeOption.yes 

202 assert form.contribute_ways == ["serving", "backend"] 

203 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

204 

205 

206def _quick_signup(): 

207 with auth_api_session() as (auth_api, metadata_interceptor): 

208 res = auth_api.SignupFlow( 

209 auth_pb2.SignupFlowReq( 

210 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

211 account=auth_pb2.SignupAccount( 

212 username="frodo", 

213 password="a very insecure password", 

214 birthdate="1970-01-01", 

215 gender="Bot", 

216 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

217 city="New York City", 

218 lat=40.7331, 

219 lng=-73.9778, 

220 radius=500, 

221 accept_tos=True, 

222 ), 

223 feedback=auth_pb2.ContributorForm(), 

224 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

225 ) 

226 ) 

227 

228 flow_token = res.flow_token 

229 

230 assert res.flow_token 

231 assert not res.HasField("auth_res") 

232 assert not res.need_basic 

233 assert not res.need_account 

234 assert not res.need_feedback 

235 assert res.need_verify_email 

236 

237 # read out the signup token directly from the database for now 

238 with session_scope() as session: 

239 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

240 assert flow.email_sent 

241 assert not flow.email_verified 

242 email_token = flow.email_token 

243 

244 with auth_api_session() as (auth_api, metadata_interceptor): 

245 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

246 

247 assert not res.flow_token 

248 assert res.HasField("auth_res") 

249 assert res.auth_res.user_id 

250 assert not res.auth_res.jailed 

251 assert not res.need_basic 

252 assert not res.need_account 

253 assert not res.need_feedback 

254 assert not res.need_verify_email 

255 

256 # make sure we got the right token in a cookie 

257 with session_scope() as session: 

258 token = ( 

259 session.execute( 

260 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

261 ).scalar_one() 

262 ).token 

263 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

264 assert sesh == token 

265 

266 

267def test_signup(db): 

268 _quick_signup() 

269 

270 

271def test_basic_login(db): 

272 # Create our test user using signup 

273 _quick_signup() 

274 

275 with auth_api_session() as (auth_api, metadata_interceptor): 

276 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

277 

278 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

279 

280 with session_scope() as session: 

281 token = ( 

282 session.execute( 

283 select(UserSession) 

284 .join(User, UserSession.user_id == User.id) 

285 .where(User.username == "frodo") 

286 .where(UserSession.token == reply_token) 

287 .where(UserSession.is_valid) 

288 ).scalar_one_or_none() 

289 ).token 

290 assert token 

291 

292 # log out 

293 with auth_api_session() as (auth_api, metadata_interceptor): 

294 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

295 

296 

297def test_login_part_signed_up_verified_email(db): 

298 """ 

299 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

300 """ 

301 with auth_api_session() as (auth_api, metadata_interceptor): 

302 res = auth_api.SignupFlow( 

303 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

304 ) 

305 

306 flow_token = res.flow_token 

307 assert res.need_verify_email 

308 

309 # verify the email 

310 with session_scope() as session: 

311 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

312 flow_token = flow.flow_token 

313 email_token = flow.email_token 

314 with auth_api_session() as (auth_api, metadata_interceptor): 

315 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

316 

317 with mock_notification_email() as mock: 

318 with auth_api_session() as (auth_api, metadata_interceptor): 

319 with pytest.raises(grpc.RpcError) as e: 

320 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

321 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

322 

323 assert mock.call_count == 1 

324 e = email_fields(mock) 

325 assert e.recipient == "email@couchers.org.invalid" 

326 assert flow_token in e.plain 

327 assert flow_token in e.html 

328 

329 

330def test_login_part_signed_up_not_verified_email(db): 

331 with auth_api_session() as (auth_api, metadata_interceptor): 

332 res = auth_api.SignupFlow( 

333 auth_pb2.SignupFlowReq( 

334 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

335 account=auth_pb2.SignupAccount( 

336 username="frodo", 

337 password="a very insecure password", 

338 birthdate="1999-01-01", 

339 gender="Bot", 

340 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

341 city="New York City", 

342 lat=40.7331, 

343 lng=-73.9778, 

344 radius=500, 

345 accept_tos=True, 

346 ), 

347 ) 

348 ) 

349 

350 flow_token = res.flow_token 

351 assert res.need_verify_email 

352 

353 with mock_notification_email() as mock: 

354 with auth_api_session() as (auth_api, metadata_interceptor): 

355 with pytest.raises(grpc.RpcError) as e: 

356 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

357 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

358 

359 with session_scope() as session: 

360 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

361 email_token = flow.email_token 

362 

363 assert mock.call_count == 1 

364 e = email_fields(mock) 

365 assert e.recipient == "email@couchers.org.invalid" 

366 assert email_token in e.plain 

367 assert email_token in e.html 

368 

369 

370def test_banned_user(db): 

371 _quick_signup() 

372 

373 with session_scope() as session: 

374 session.execute(select(User)).scalar_one().is_banned = True 

375 

376 with auth_api_session() as (auth_api, metadata_interceptor): 

377 with pytest.raises(grpc.RpcError) as e: 

378 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

379 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

380 assert e.value.details() == errors.ACCOUNT_SUSPENDED 

381 

382 

383def test_deleted_user(db): 

384 _quick_signup() 

385 

386 with session_scope() as session: 

387 session.execute(select(User)).scalar_one().is_deleted = True 

388 

389 with auth_api_session() as (auth_api, metadata_interceptor): 

390 with pytest.raises(grpc.RpcError) as e: 

391 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

392 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

393 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

394 

395 

396def test_invalid_token(db): 

397 user1, token1 = generate_user() 

398 user2, token2 = generate_user() 

399 

400 wrong_token = random_hex(32) 

401 

402 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

403 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

404 

405 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

406 assert e.value.details() == "Unauthorized" 

407 

408 

409def test_password_reset_v2(db, push_collector): 

410 user, token = generate_user(hashed_password=hash_password("mypassword")) 

411 

412 with auth_api_session() as (auth_api, metadata_interceptor): 

413 with mock_notification_email() as mock: 

414 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

415 

416 with session_scope() as session: 

417 password_reset_token = session.execute(select(PasswordResetToken)).scalar_one().token 

418 

419 assert mock.call_count == 1 

420 e = email_fields(mock) 

421 assert e.recipient == user.email 

422 assert "reset" in e.subject.lower() 

423 assert password_reset_token in e.plain 

424 assert password_reset_token in e.html 

425 unique_string = "You asked for your password to be reset on Couchers.org." 

426 assert unique_string in e.plain 

427 assert unique_string in e.html 

428 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

429 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

430 assert "support@couchers.org" in e.plain 

431 assert "support@couchers.org" in e.html 

432 

433 push_collector.assert_user_push_matches_fields( 

434 user.id, 

435 title="A password reset was initiated on your account", 

436 body="Someone initiated a password change on your account.", 

437 ) 

438 

439 # make sure bad password are caught 

440 with auth_api_session() as (auth_api, metadata_interceptor): 

441 with pytest.raises(grpc.RpcError) as e: 

442 auth_api.CompletePasswordResetV2( 

443 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

444 ) 

445 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

446 assert e.value.details() == errors.INSECURE_PASSWORD 

447 

448 # make sure we can set a good password 

449 with auth_api_session() as (auth_api, metadata_interceptor): 

450 pwd = random_hex() 

451 with mock_notification_email() as mock: 

452 res = auth_api.CompletePasswordResetV2( 

453 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

454 ) 

455 

456 push_collector.assert_user_push_matches_fields( 

457 user.id, 

458 ix=1, 

459 title="Your password was successfully reset", 

460 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.", 

461 ) 

462 

463 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

464 

465 with session_scope() as session: 

466 other_session_token = ( 

467 session.execute( 

468 select(UserSession) 

469 .join(User, UserSession.user_id == User.id) 

470 .where(User.username == user.username) 

471 .where(UserSession.token == session_token) 

472 .where(UserSession.is_valid) 

473 ).scalar_one_or_none() 

474 ).token 

475 assert other_session_token 

476 

477 # make sure we can't set a password again 

478 with auth_api_session() as (auth_api, metadata_interceptor): 

479 with pytest.raises(grpc.RpcError) as e: 

480 auth_api.CompletePasswordResetV2( 

481 auth_pb2.CompletePasswordResetV2Req( 

482 password_reset_token=password_reset_token, new_password=random_hex() 

483 ) 

484 ) 

485 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

486 assert e.value.details() == errors.INVALID_TOKEN 

487 

488 with session_scope() as session: 

489 user = session.execute(select(User)).scalar_one() 

490 assert user.hashed_password == hash_password(pwd) 

491 

492 

493def test_password_reset_no_such_user(db): 

494 user, token = generate_user() 

495 

496 with auth_api_session() as (auth_api, metadata_interceptor): 

497 res = auth_api.ResetPassword( 

498 auth_pb2.ResetPasswordReq( 

499 user="nonexistentuser", 

500 ) 

501 ) 

502 

503 with session_scope() as session: 

504 res = session.execute(select(PasswordResetToken)).scalar_one_or_none() 

505 

506 assert res is None 

507 

508 

509def test_password_reset_invalid_token_v2(db): 

510 password = random_hex() 

511 user, token = generate_user(hashed_password=hash_password(password)) 

512 

513 with auth_api_session() as (auth_api, metadata_interceptor): 

514 res = auth_api.ResetPassword( 

515 auth_pb2.ResetPasswordReq( 

516 user=user.username, 

517 ) 

518 ) 

519 

520 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

521 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

522 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

523 assert e.value.details() == errors.INVALID_TOKEN 

524 

525 with session_scope() as session: 

526 user = session.execute(select(User)).scalar_one() 

527 assert user.hashed_password == hash_password(password) 

528 

529 

530def test_logout_invalid_token(db): 

531 # Create our test user using signup 

532 _quick_signup() 

533 

534 with auth_api_session() as (auth_api, metadata_interceptor): 

535 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

536 

537 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

538 

539 # delete all login tokens 

540 with session_scope() as session: 

541 session.execute(delete(LoginToken)) 

542 

543 # log out with non-existent token should still return a valid result 

544 with auth_api_session() as (auth_api, metadata_interceptor): 

545 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

546 

547 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

548 # make sure we set an empty cookie 

549 assert reply_token == "" 

550 

551 

552def test_signup_without_password(db): 

553 with auth_api_session() as (auth_api, metadata_interceptor): 

554 with pytest.raises(grpc.RpcError) as e: 

555 auth_api.SignupFlow( 

556 auth_pb2.SignupFlowReq( 

557 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

558 account=auth_pb2.SignupAccount( 

559 username="frodo", 

560 password="bad", 

561 city="Minas Tirith", 

562 birthdate="9999-12-31", # arbitrary future birthdate 

563 gender="Robot", 

564 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

565 lat=1, 

566 lng=1, 

567 radius=100, 

568 accept_tos=True, 

569 ), 

570 feedback=auth_pb2.ContributorForm(), 

571 ) 

572 ) 

573 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

574 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

575 

576 

577def test_signup_invalid_birthdate(db): 

578 with auth_api_session() as (auth_api, metadata_interceptor): 

579 with pytest.raises(grpc.RpcError) as e: 

580 auth_api.SignupFlow( 

581 auth_pb2.SignupFlowReq( 

582 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

583 account=auth_pb2.SignupAccount( 

584 username="frodo", 

585 password="a very insecure password", 

586 city="Minas Tirith", 

587 birthdate="9999-12-31", # arbitrary future birthdate 

588 gender="Robot", 

589 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

590 lat=1, 

591 lng=1, 

592 radius=100, 

593 accept_tos=True, 

594 ), 

595 feedback=auth_pb2.ContributorForm(), 

596 ) 

597 ) 

598 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

599 assert e.value.details() == errors.INVALID_BIRTHDATE 

600 

601 res = auth_api.SignupFlow( 

602 auth_pb2.SignupFlowReq( 

603 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

604 account=auth_pb2.SignupAccount( 

605 username="ceelo", 

606 password="a very insecure password", 

607 city="New York City", 

608 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

609 gender="Helicopter", 

610 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

611 lat=1, 

612 lng=1, 

613 radius=100, 

614 accept_tos=True, 

615 ), 

616 feedback=auth_pb2.ContributorForm(), 

617 ) 

618 ) 

619 

620 assert res.flow_token 

621 

622 with pytest.raises(grpc.RpcError) as e: 

623 auth_api.SignupFlow( 

624 auth_pb2.SignupFlowReq( 

625 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

626 account=auth_pb2.SignupAccount( 

627 username="franklin", 

628 password="a very insecure password", 

629 city="Los Santos", 

630 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

631 gender="Male", 

632 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

633 lat=1, 

634 lng=1, 

635 radius=100, 

636 accept_tos=True, 

637 ), 

638 feedback=auth_pb2.ContributorForm(), 

639 ) 

640 ) 

641 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

642 assert e.value.details() == errors.INVALID_BIRTHDATE 

643 

644 with session_scope() as session: 

645 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

646 

647 

648def test_signup_invalid_email(db): 

649 with auth_api_session() as (auth_api, metadata_interceptor): 

650 with pytest.raises(grpc.RpcError) as e: 

651 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

652 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

653 assert e.value.details() == errors.INVALID_EMAIL 

654 

655 with auth_api_session() as (auth_api, metadata_interceptor): 

656 with pytest.raises(grpc.RpcError) as e: 

657 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

658 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

659 assert e.value.details() == errors.INVALID_EMAIL 

660 

661 with auth_api_session() as (auth_api, metadata_interceptor): 

662 with pytest.raises(grpc.RpcError) as e: 

663 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

664 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

665 assert e.value.details() == errors.INVALID_EMAIL 

666 

667 with auth_api_session() as (auth_api, metadata_interceptor): 

668 with pytest.raises(grpc.RpcError) as e: 

669 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

670 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

671 assert e.value.details() == errors.INVALID_EMAIL 

672 

673 

674def test_signup_existing_email(db): 

675 # Signed up user 

676 user, _ = generate_user() 

677 

678 with auth_api_session() as (auth_api, metadata_interceptor): 

679 with pytest.raises(grpc.RpcError) as e: 

680 reply = auth_api.SignupFlow( 

681 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)) 

682 ) 

683 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

684 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN 

685 

686 

687def test_signup_continue_with_email(db): 

688 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

689 with auth_api_session() as (auth_api, metadata_interceptor): 

690 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

691 flow_token = res.flow_token 

692 assert flow_token 

693 

694 # continue with same email, should just send another email to the user 

695 with auth_api_session() as (auth_api, metadata_interceptor): 

696 with pytest.raises(grpc.RpcError) as e: 

697 res = auth_api.SignupFlow( 

698 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

699 ) 

700 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

701 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

702 

703 

704def test_signup_resend_email(db): 

705 with auth_api_session() as (auth_api, metadata_interceptor): 

706 with mock_notification_email() as mock: 

707 res = auth_api.SignupFlow( 

708 auth_pb2.SignupFlowReq( 

709 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

710 account=auth_pb2.SignupAccount( 

711 username="frodo", 

712 password="a very insecure password", 

713 birthdate="1970-01-01", 

714 gender="Bot", 

715 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

716 city="New York City", 

717 lat=40.7331, 

718 lng=-73.9778, 

719 radius=500, 

720 accept_tos=True, 

721 ), 

722 feedback=auth_pb2.ContributorForm(), 

723 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

724 ) 

725 ) 

726 assert mock.call_count == 1 

727 e = email_fields(mock) 

728 assert e.recipient == "email@couchers.org.invalid" 

729 

730 flow_token = res.flow_token 

731 assert flow_token 

732 

733 with session_scope() as session: 

734 flow = session.execute(select(SignupFlow)).scalar_one() 

735 assert flow.flow_token == flow_token 

736 assert flow.email_sent 

737 assert not flow.email_verified 

738 email_token = flow.email_token 

739 

740 # ask for a new signup email 

741 with auth_api_session() as (auth_api, metadata_interceptor): 

742 with mock_notification_email() as mock: 

743 res = auth_api.SignupFlow( 

744 auth_pb2.SignupFlowReq( 

745 flow_token=flow_token, 

746 resend_verification_email=True, 

747 ) 

748 ) 

749 assert mock.call_count == 1 

750 e = email_fields(mock) 

751 assert e.recipient == "email@couchers.org.invalid" 

752 assert email_token in e.plain 

753 assert email_token in e.html 

754 

755 with session_scope() as session: 

756 flow = session.execute(select(SignupFlow)).scalar_one() 

757 assert not flow.email_verified 

758 

759 with auth_api_session() as (auth_api, metadata_interceptor): 

760 res = auth_api.SignupFlow( 

761 auth_pb2.SignupFlowReq( 

762 email_token=email_token, 

763 ) 

764 ) 

765 

766 assert not res.flow_token 

767 assert res.HasField("auth_res") 

768 

769 

770def test_successful_authenticate(db): 

771 user, _ = generate_user(hashed_password=hash_password("password")) 

772 

773 # Authenticate with username 

774 with auth_api_session() as (auth_api, metadata_interceptor): 

775 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

776 assert not reply.jailed 

777 

778 # Authenticate with email 

779 with auth_api_session() as (auth_api, metadata_interceptor): 

780 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

781 assert not reply.jailed 

782 

783 

784def test_unsuccessful_authenticate(db): 

785 user, _ = generate_user(hashed_password=hash_password("password")) 

786 

787 # Invalid password 

788 with auth_api_session() as (auth_api, metadata_interceptor): 

789 with pytest.raises(grpc.RpcError) as e: 

790 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

791 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

792 assert e.value.details() == errors.INVALID_PASSWORD 

793 

794 # Invalid username 

795 with auth_api_session() as (auth_api, metadata_interceptor): 

796 with pytest.raises(grpc.RpcError) as e: 

797 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

798 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

799 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

800 

801 # Invalid email 

802 with auth_api_session() as (auth_api, metadata_interceptor): 

803 with pytest.raises(grpc.RpcError) as e: 

804 reply = auth_api.Authenticate( 

805 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

806 ) 

807 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

808 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

809 

810 # Invalid id 

811 with auth_api_session() as (auth_api, metadata_interceptor): 

812 with pytest.raises(grpc.RpcError) as e: 

813 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

814 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

815 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

816 

817 

818def test_complete_signup(db): 

819 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

820 with auth_api_session() as (auth_api, metadata_interceptor): 

821 reply = auth_api.SignupFlow( 

822 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

823 ) 

824 

825 flow_token = reply.flow_token 

826 

827 with auth_api_session() as (auth_api, metadata_interceptor): 

828 # Invalid username 

829 with pytest.raises(grpc.RpcError) as e: 

830 reply = auth_api.SignupFlow( 

831 auth_pb2.SignupFlowReq( 

832 flow_token=flow_token, 

833 account=auth_pb2.SignupAccount( 

834 username=" ", 

835 password="a very insecure password", 

836 city="Minas Tirith", 

837 birthdate="1980-12-31", 

838 gender="Robot", 

839 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

840 lat=1, 

841 lng=1, 

842 radius=100, 

843 accept_tos=True, 

844 ), 

845 ) 

846 ) 

847 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

848 assert e.value.details() == errors.INVALID_USERNAME 

849 

850 with auth_api_session() as (auth_api, metadata_interceptor): 

851 # Invalid name 

852 with pytest.raises(grpc.RpcError) as e: 

853 reply = auth_api.SignupFlow( 

854 auth_pb2.SignupFlowReq( 

855 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

856 ) 

857 ) 

858 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

859 assert e.value.details() == errors.INVALID_NAME 

860 

861 with auth_api_session() as (auth_api, metadata_interceptor): 

862 # Hosting status required 

863 with pytest.raises(grpc.RpcError) as e: 

864 reply = auth_api.SignupFlow( 

865 auth_pb2.SignupFlowReq( 

866 flow_token=flow_token, 

867 account=auth_pb2.SignupAccount( 

868 username="frodo", 

869 password="a very insecure password", 

870 city="Minas Tirith", 

871 birthdate="1980-12-31", 

872 gender="Robot", 

873 hosting_status=None, 

874 lat=1, 

875 lng=1, 

876 radius=100, 

877 accept_tos=True, 

878 ), 

879 ) 

880 ) 

881 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

882 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED 

883 

884 user, _ = generate_user() 

885 with auth_api_session() as (auth_api, metadata_interceptor): 

886 # Username unavailable 

887 with pytest.raises(grpc.RpcError) as e: 

888 reply = auth_api.SignupFlow( 

889 auth_pb2.SignupFlowReq( 

890 flow_token=flow_token, 

891 account=auth_pb2.SignupAccount( 

892 username=user.username, 

893 password="a very insecure password", 

894 city="Minas Tirith", 

895 birthdate="1980-12-31", 

896 gender="Robot", 

897 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

898 lat=1, 

899 lng=1, 

900 radius=100, 

901 accept_tos=True, 

902 ), 

903 ) 

904 ) 

905 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

906 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE 

907 

908 with auth_api_session() as (auth_api, metadata_interceptor): 

909 # Invalid coordinate 

910 with pytest.raises(grpc.RpcError) as e: 

911 reply = auth_api.SignupFlow( 

912 auth_pb2.SignupFlowReq( 

913 flow_token=flow_token, 

914 account=auth_pb2.SignupAccount( 

915 username="frodo", 

916 password="a very insecure password", 

917 city="Minas Tirith", 

918 birthdate="1980-12-31", 

919 gender="Robot", 

920 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

921 lat=0, 

922 lng=0, 

923 radius=100, 

924 accept_tos=True, 

925 ), 

926 ) 

927 ) 

928 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

929 assert e.value.details() == errors.INVALID_COORDINATE 

930 

931 

932def test_signup_token_regression(db): 

933 # Repro steps: 

934 # 1. Start a signup 

935 # 2. Confirm the email 

936 # 3. Start a new signup with the same email 

937 # Expected: send a link to the email to continue signing up. 

938 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

939 

940 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

941 

942 # 1. Start a signup 

943 with auth_api_session() as (auth_api, metadata_interceptor): 

944 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

945 flow_token = res.flow_token 

946 assert flow_token 

947 

948 # 2. Confirm the email 

949 with session_scope() as session: 

950 email_token = ( 

951 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token 

952 ) 

953 

954 with auth_api_session() as (auth_api, metadata_interceptor): 

955 res = auth_api.SignupFlow( 

956 auth_pb2.SignupFlowReq( 

957 flow_token=flow_token, 

958 email_token=email_token, 

959 ) 

960 ) 

961 

962 # 3. Start a new signup with the same email 

963 with auth_api_session() as (auth_api, metadata_interceptor): 

964 with pytest.raises(grpc.RpcError) as e: 

965 res = auth_api.SignupFlow( 

966 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

967 ) 

968 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

969 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

970 

971 

972@pytest.mark.parametrize("opt_out", [True, False]) 

973def test_opt_out_of_newsletter(db, opt_out): 

974 with auth_api_session() as (auth_api, metadata_interceptor): 

975 res = auth_api.SignupFlow( 

976 auth_pb2.SignupFlowReq( 

977 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

978 account=auth_pb2.SignupAccount( 

979 username="frodo", 

980 password="a very insecure password", 

981 birthdate="1970-01-01", 

982 gender="Bot", 

983 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

984 city="New York City", 

985 lat=40.7331, 

986 lng=-73.9778, 

987 radius=500, 

988 accept_tos=True, 

989 opt_out_of_newsletter=opt_out, 

990 ), 

991 feedback=auth_pb2.ContributorForm(), 

992 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

993 ) 

994 ) 

995 

996 with session_scope() as session: 

997 email_token = ( 

998 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token 

999 ) 

1000 

1001 with auth_api_session() as (auth_api, metadata_interceptor): 

1002 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1003 

1004 user_id = res.auth_res.user_id 

1005 

1006 with session_scope() as session: 

1007 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1008 assert not user.in_sync_with_newsletter 

1009 assert user.opt_out_of_newsletter == opt_out 

1010 

1011 

1012def test_GetAuthState(db): 

1013 user, token = generate_user() 

1014 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1015 

1016 with auth_api_session() as (auth_api, metadata_interceptor): 

1017 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1018 assert not res.logged_in 

1019 assert not res.HasField("auth_res") 

1020 

1021 with auth_api_session() as (auth_api, metadata_interceptor): 

1022 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1023 assert res.logged_in 

1024 assert res.HasField("auth_res") 

1025 assert res.auth_res.user_id == user.id 

1026 assert not res.auth_res.jailed 

1027 

1028 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1029 

1030 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1031 assert not res.logged_in 

1032 assert not res.HasField("auth_res") 

1033 

1034 with auth_api_session() as (auth_api, metadata_interceptor): 

1035 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1036 assert res.logged_in 

1037 assert res.HasField("auth_res") 

1038 assert res.auth_res.user_id == jailed_user.id 

1039 assert res.auth_res.jailed 

1040 

1041 

1042def test_signup_no_feedback_regression(db): 

1043 """ 

1044 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1045 this regression test checks that. 

1046 """ 

1047 with auth_api_session() as (auth_api, metadata_interceptor): 

1048 res = auth_api.SignupFlow( 

1049 auth_pb2.SignupFlowReq( 

1050 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1051 account=auth_pb2.SignupAccount( 

1052 username="frodo", 

1053 password="a very insecure password", 

1054 birthdate="1970-01-01", 

1055 gender="Bot", 

1056 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1057 city="New York City", 

1058 lat=40.7331, 

1059 lng=-73.9778, 

1060 radius=500, 

1061 accept_tos=True, 

1062 ), 

1063 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1064 ) 

1065 ) 

1066 

1067 flow_token = res.flow_token 

1068 

1069 assert res.flow_token 

1070 assert not res.HasField("auth_res") 

1071 assert not res.need_basic 

1072 assert not res.need_account 

1073 assert not res.need_feedback 

1074 assert res.need_verify_email 

1075 

1076 # read out the signup token directly from the database for now 

1077 with session_scope() as session: 

1078 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1079 assert flow.email_sent 

1080 assert not flow.email_verified 

1081 email_token = flow.email_token 

1082 

1083 with auth_api_session() as (auth_api, metadata_interceptor): 

1084 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1085 

1086 assert not res.flow_token 

1087 assert res.HasField("auth_res") 

1088 assert res.auth_res.user_id 

1089 assert not res.auth_res.jailed 

1090 assert not res.need_basic 

1091 assert not res.need_account 

1092 assert not res.need_feedback 

1093 assert not res.need_verify_email 

1094 

1095 # make sure we got the right token in a cookie 

1096 with session_scope() as session: 

1097 token = ( 

1098 session.execute( 

1099 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1100 ).scalar_one() 

1101 ).token 

1102 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1103 assert sesh == token 

1104 

1105 

1106def test_banned_username(db): 

1107 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1108 with auth_api_session() as (auth_api, metadata_interceptor): 

1109 reply = auth_api.SignupFlow( 

1110 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1111 ) 

1112 

1113 flow_token = reply.flow_token 

1114 

1115 with auth_api_session() as (auth_api, metadata_interceptor): 

1116 # Banned username 

1117 with pytest.raises(grpc.RpcError) as e: 

1118 reply = auth_api.SignupFlow( 

1119 auth_pb2.SignupFlowReq( 

1120 flow_token=flow_token, 

1121 account=auth_pb2.SignupAccount( 

1122 username="thecouchersadminaccount", 

1123 password="a very insecure password", 

1124 city="Minas Tirith", 

1125 birthdate="1980-12-31", 

1126 gender="Robot", 

1127 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1128 lat=1, 

1129 lng=1, 

1130 radius=100, 

1131 accept_tos=True, 

1132 ), 

1133 ) 

1134 ) 

1135 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1136 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE 

1137 

1138 

1139# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*