Coverage for src/tests/test_auth.py: 100%

488 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import http.cookies 

2 

3import grpc 

4import pytest 

5from google.protobuf import empty_pb2, wrappers_pb2 

6from sqlalchemy.sql import delete, func 

7 

8from couchers import errors 

9from couchers.crypto import hash_password, random_hex 

10from couchers.db import session_scope 

11from couchers.models import ( 

12 ContributeOption, 

13 ContributorForm, 

14 LoginToken, 

15 PasswordResetToken, 

16 SignupFlow, 

17 User, 

18 UserSession, 

19) 

20from couchers.sql import couchers_select as select 

21from proto import api_pb2, auth_pb2 

22from tests.test_fixtures import ( # noqa 

23 api_session, 

24 auth_api_session, 

25 db, 

26 email_fields, 

27 fast_passwords, 

28 generate_user, 

29 mock_notification_email, 

30 push_collector, 

31 real_api_session, 

32 testconfig, 

33) 

34 

35 

36@pytest.fixture(autouse=True) 

37def _(testconfig, fast_passwords): 

38 pass 

39 

40 

41def get_session_cookie_tokens(metadata_interceptor): 

42 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

43 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

44 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

45 return sesh, uid 

46 

47 

48def test_UsernameValid(db): 

49 with auth_api_session() as (auth_api, metadata_interceptor): 

50 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

51 

52 with auth_api_session() as (auth_api, metadata_interceptor): 

53 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

54 

55 

56def test_signup_incremental(db): 

57 with auth_api_session() as (auth_api, metadata_interceptor): 

58 res = auth_api.SignupFlow( 

59 auth_pb2.SignupFlowReq( 

60 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

61 ) 

62 ) 

63 

64 flow_token = res.flow_token 

65 assert res.flow_token 

66 assert not res.HasField("auth_res") 

67 assert not res.need_basic 

68 assert res.need_account 

69 assert res.need_feedback 

70 assert res.need_verify_email 

71 assert res.need_accept_community_guidelines 

72 

73 # read out the signup token directly from the database for now 

74 with session_scope() as session: 

75 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

76 assert flow.email_sent 

77 assert not flow.email_verified 

78 email_token = flow.email_token 

79 

80 with auth_api_session() as (auth_api, metadata_interceptor): 

81 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

82 

83 assert res.flow_token == flow_token 

84 assert not res.HasField("auth_res") 

85 assert not res.need_basic 

86 assert res.need_account 

87 assert res.need_feedback 

88 assert res.need_verify_email 

89 assert res.need_accept_community_guidelines 

90 

91 # Add feedback 

92 with auth_api_session() as (auth_api, metadata_interceptor): 

93 res = auth_api.SignupFlow( 

94 auth_pb2.SignupFlowReq( 

95 flow_token=flow_token, 

96 feedback=auth_pb2.ContributorForm( 

97 ideas="I'm a robot, incapable of original ideation", 

98 features="I love all your features", 

99 experience="I haven't done couch surfing before", 

100 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

101 contribute_ways=["serving", "backend"], 

102 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

103 ), 

104 ) 

105 ) 

106 

107 assert res.flow_token == flow_token 

108 assert not res.HasField("auth_res") 

109 assert not res.need_basic 

110 assert res.need_account 

111 assert not res.need_feedback 

112 assert res.need_verify_email 

113 assert res.need_accept_community_guidelines 

114 

115 # Agree to community guidelines 

116 with auth_api_session() as (auth_api, metadata_interceptor): 

117 res = auth_api.SignupFlow( 

118 auth_pb2.SignupFlowReq( 

119 flow_token=flow_token, 

120 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

121 ) 

122 ) 

123 

124 assert res.flow_token == flow_token 

125 assert not res.HasField("auth_res") 

126 assert not res.need_basic 

127 assert res.need_account 

128 assert not res.need_feedback 

129 assert res.need_verify_email 

130 assert not res.need_accept_community_guidelines 

131 

132 # Verify email 

133 with auth_api_session() as (auth_api, metadata_interceptor): 

134 res = auth_api.SignupFlow( 

135 auth_pb2.SignupFlowReq( 

136 flow_token=flow_token, 

137 email_token=email_token, 

138 ) 

139 ) 

140 

141 assert res.flow_token == flow_token 

142 assert not res.HasField("auth_res") 

143 assert not res.need_basic 

144 assert res.need_account 

145 assert not res.need_feedback 

146 assert not res.need_verify_email 

147 assert not res.need_accept_community_guidelines 

148 

149 # Finally finish off account info 

150 with auth_api_session() as (auth_api, metadata_interceptor): 

151 res = auth_api.SignupFlow( 

152 auth_pb2.SignupFlowReq( 

153 flow_token=flow_token, 

154 account=auth_pb2.SignupAccount( 

155 username="frodo", 

156 password="a very insecure password", 

157 birthdate="1970-01-01", 

158 gender="Bot", 

159 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

160 city="New York City", 

161 lat=40.7331, 

162 lng=-73.9778, 

163 radius=500, 

164 accept_tos=True, 

165 ), 

166 ) 

167 ) 

168 

169 assert not res.flow_token 

170 assert res.HasField("auth_res") 

171 assert res.auth_res.user_id 

172 assert not res.auth_res.jailed 

173 assert not res.need_basic 

174 assert not res.need_account 

175 assert not res.need_feedback 

176 assert not res.need_verify_email 

177 assert not res.need_accept_community_guidelines 

178 

179 user_id = res.auth_res.user_id 

180 

181 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

182 assert uid == str(user_id) 

183 

184 with api_session(sess_token) as api: 

185 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

186 

187 assert res.username == "frodo" 

188 assert res.gender == "Bot" 

189 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

190 assert res.city == "New York City" 

191 assert res.lat == 40.7331 

192 assert res.lng == -73.9778 

193 assert res.radius == 500 

194 

195 with session_scope() as session: 

196 form = session.execute(select(ContributorForm)).scalar_one() 

197 

198 assert form.ideas == "I'm a robot, incapable of original ideation" 

199 assert form.features == "I love all your features" 

200 assert form.experience == "I haven't done couch surfing before" 

201 assert form.contribute == ContributeOption.yes 

202 assert form.contribute_ways == ["serving", "backend"] 

203 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

204 

205 

206def _quick_signup(): 

207 with auth_api_session() as (auth_api, metadata_interceptor): 

208 res = auth_api.SignupFlow( 

209 auth_pb2.SignupFlowReq( 

210 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

211 account=auth_pb2.SignupAccount( 

212 username="frodo", 

213 password="a very insecure password", 

214 birthdate="1970-01-01", 

215 gender="Bot", 

216 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

217 city="New York City", 

218 lat=40.7331, 

219 lng=-73.9778, 

220 radius=500, 

221 accept_tos=True, 

222 ), 

223 feedback=auth_pb2.ContributorForm(), 

224 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

225 ) 

226 ) 

227 

228 flow_token = res.flow_token 

229 

230 assert res.flow_token 

231 assert not res.HasField("auth_res") 

232 assert not res.need_basic 

233 assert not res.need_account 

234 assert not res.need_feedback 

235 assert res.need_verify_email 

236 

237 # read out the signup token directly from the database for now 

238 with session_scope() as session: 

239 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

240 assert flow.email_sent 

241 assert not flow.email_verified 

242 email_token = flow.email_token 

243 

244 with auth_api_session() as (auth_api, metadata_interceptor): 

245 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

246 

247 assert not res.flow_token 

248 assert res.HasField("auth_res") 

249 assert res.auth_res.user_id 

250 assert not res.auth_res.jailed 

251 assert not res.need_basic 

252 assert not res.need_account 

253 assert not res.need_feedback 

254 assert not res.need_verify_email 

255 

256 # make sure we got the right token in a cookie 

257 with session_scope() as session: 

258 token = ( 

259 session.execute( 

260 select(UserSession).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

261 ).scalar_one() 

262 ).token 

263 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

264 assert sesh == token 

265 

266 

267def test_signup(db): 

268 _quick_signup() 

269 

270 

271def test_basic_login(db): 

272 # Create our test user using signup 

273 _quick_signup() 

274 

275 with auth_api_session() as (auth_api, metadata_interceptor): 

276 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

277 

278 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

279 

280 with session_scope() as session: 

281 token = ( 

282 session.execute( 

283 select(UserSession) 

284 .join(User, UserSession.user_id == User.id) 

285 .where(User.username == "frodo") 

286 .where(UserSession.token == reply_token) 

287 .where(UserSession.is_valid) 

288 ).scalar_one_or_none() 

289 ).token 

290 assert token 

291 

292 # log out 

293 with auth_api_session() as (auth_api, metadata_interceptor): 

294 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

295 

296 

297def test_login_part_signed_up_verified_email(db): 

298 """ 

299 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

300 """ 

301 with auth_api_session() as (auth_api, metadata_interceptor): 

302 res = auth_api.SignupFlow( 

303 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

304 ) 

305 

306 flow_token = res.flow_token 

307 assert res.need_verify_email 

308 

309 # verify the email 

310 with session_scope() as session: 

311 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

312 flow_token = flow.flow_token 

313 email_token = flow.email_token 

314 with auth_api_session() as (auth_api, metadata_interceptor): 

315 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

316 

317 with mock_notification_email() as mock: 

318 with auth_api_session() as (auth_api, metadata_interceptor): 

319 with pytest.raises(grpc.RpcError) as e: 

320 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

321 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

322 

323 assert mock.call_count == 1 

324 e = email_fields(mock) 

325 assert e.recipient == "email@couchers.org.invalid" 

326 assert flow_token in e.plain 

327 assert flow_token in e.html 

328 

329 

330def test_login_part_signed_up_not_verified_email(db): 

331 with auth_api_session() as (auth_api, metadata_interceptor): 

332 res = auth_api.SignupFlow( 

333 auth_pb2.SignupFlowReq( 

334 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

335 account=auth_pb2.SignupAccount( 

336 username="frodo", 

337 password="a very insecure password", 

338 birthdate="1999-01-01", 

339 gender="Bot", 

340 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

341 city="New York City", 

342 lat=40.7331, 

343 lng=-73.9778, 

344 radius=500, 

345 accept_tos=True, 

346 ), 

347 ) 

348 ) 

349 

350 flow_token = res.flow_token 

351 assert res.need_verify_email 

352 

353 with mock_notification_email() as mock: 

354 with auth_api_session() as (auth_api, metadata_interceptor): 

355 with pytest.raises(grpc.RpcError) as e: 

356 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

357 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

358 

359 with session_scope() as session: 

360 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

361 email_token = flow.email_token 

362 

363 assert mock.call_count == 1 

364 e = email_fields(mock) 

365 assert e.recipient == "email@couchers.org.invalid" 

366 assert email_token in e.plain 

367 assert email_token in e.html 

368 

369 

370def test_banned_user(db): 

371 _quick_signup() 

372 

373 with session_scope() as session: 

374 session.execute(select(User)).scalar_one().is_banned = True 

375 

376 with auth_api_session() as (auth_api, metadata_interceptor): 

377 with pytest.raises(grpc.RpcError) as e: 

378 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

379 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

380 assert e.value.details() == errors.ACCOUNT_SUSPENDED 

381 

382 

383def test_deleted_user(db): 

384 _quick_signup() 

385 

386 with session_scope() as session: 

387 session.execute(select(User)).scalar_one().is_deleted = True 

388 

389 with auth_api_session() as (auth_api, metadata_interceptor): 

390 with pytest.raises(grpc.RpcError) as e: 

391 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

392 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

393 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

394 

395 

396def test_invalid_token(db): 

397 user1, token1 = generate_user() 

398 user2, token2 = generate_user() 

399 

400 wrong_token = random_hex(32) 

401 

402 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

403 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

404 

405 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

406 assert e.value.details() == "Unauthorized" 

407 

408 

409def test_password_reset_v2(db, push_collector): 

410 user, token = generate_user(hashed_password=hash_password("mypassword")) 

411 

412 with auth_api_session() as (auth_api, metadata_interceptor): 

413 with mock_notification_email() as mock: 

414 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

415 

416 with session_scope() as session: 

417 password_reset_token = session.execute(select(PasswordResetToken)).scalar_one().token 

418 

419 assert mock.call_count == 1 

420 e = email_fields(mock) 

421 assert e.recipient == user.email 

422 assert "reset" in e.subject.lower() 

423 assert password_reset_token in e.plain 

424 assert password_reset_token in e.html 

425 unique_string = "You asked for your password to be reset on Couchers.org." 

426 assert unique_string in e.plain 

427 assert unique_string in e.html 

428 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

429 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

430 assert "support@couchers.org" in e.plain 

431 assert "support@couchers.org" in e.html 

432 

433 push_collector.assert_user_push_matches_fields( 

434 user.id, 

435 title="A password reset was initiated on your account", 

436 body="Someone initiated a password change on your account.", 

437 ) 

438 

439 # make sure bad password are caught 

440 with auth_api_session() as (auth_api, metadata_interceptor): 

441 with pytest.raises(grpc.RpcError) as e: 

442 auth_api.CompletePasswordResetV2( 

443 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

444 ) 

445 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

446 assert e.value.details() == errors.INSECURE_PASSWORD 

447 

448 # make sure we can set a good password 

449 with auth_api_session() as (auth_api, metadata_interceptor): 

450 pwd = random_hex() 

451 with mock_notification_email() as mock: 

452 res = auth_api.CompletePasswordResetV2( 

453 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

454 ) 

455 

456 push_collector.assert_user_push_matches_fields( 

457 user.id, 

458 ix=1, 

459 title="Your password was successfully reset", 

460 body="Your password on Couchers.org was changed. If that was you, then no further action is needed.", 

461 ) 

462 

463 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

464 

465 with session_scope() as session: 

466 other_session_token = ( 

467 session.execute( 

468 select(UserSession) 

469 .join(User, UserSession.user_id == User.id) 

470 .where(User.username == user.username) 

471 .where(UserSession.token == session_token) 

472 .where(UserSession.is_valid) 

473 ).scalar_one_or_none() 

474 ).token 

475 assert other_session_token 

476 

477 # make sure we can't set a password again 

478 with auth_api_session() as (auth_api, metadata_interceptor): 

479 with pytest.raises(grpc.RpcError) as e: 

480 auth_api.CompletePasswordResetV2( 

481 auth_pb2.CompletePasswordResetV2Req( 

482 password_reset_token=password_reset_token, new_password=random_hex() 

483 ) 

484 ) 

485 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

486 assert e.value.details() == errors.INVALID_TOKEN 

487 

488 with session_scope() as session: 

489 user = session.execute(select(User)).scalar_one() 

490 assert user.hashed_password == hash_password(pwd) 

491 

492 

493def test_password_reset_no_such_user(db): 

494 user, token = generate_user() 

495 

496 with auth_api_session() as (auth_api, metadata_interceptor): 

497 res = auth_api.ResetPassword( 

498 auth_pb2.ResetPasswordReq( 

499 user="nonexistentuser", 

500 ) 

501 ) 

502 

503 with session_scope() as session: 

504 res = session.execute(select(PasswordResetToken)).scalar_one_or_none() 

505 

506 assert res is None 

507 

508 

509def test_password_reset_invalid_token_v2(db): 

510 password = random_hex() 

511 user, token = generate_user(hashed_password=hash_password(password)) 

512 

513 with auth_api_session() as (auth_api, metadata_interceptor): 

514 res = auth_api.ResetPassword( 

515 auth_pb2.ResetPasswordReq( 

516 user=user.username, 

517 ) 

518 ) 

519 

520 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

521 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

522 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

523 assert e.value.details() == errors.INVALID_TOKEN 

524 

525 with session_scope() as session: 

526 user = session.execute(select(User)).scalar_one() 

527 assert user.hashed_password == hash_password(password) 

528 

529 

530def test_logout_invalid_token(db): 

531 # Create our test user using signup 

532 _quick_signup() 

533 

534 with auth_api_session() as (auth_api, metadata_interceptor): 

535 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

536 

537 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

538 

539 # delete all login tokens 

540 with session_scope() as session: 

541 session.execute(delete(LoginToken)) 

542 

543 # log out with non-existent token should still return a valid result 

544 with auth_api_session() as (auth_api, metadata_interceptor): 

545 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

546 

547 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

548 # make sure we set an empty cookie 

549 assert reply_token == "" 

550 

551 

552def test_signup_without_password(db): 

553 with auth_api_session() as (auth_api, metadata_interceptor): 

554 with pytest.raises(grpc.RpcError) as e: 

555 auth_api.SignupFlow( 

556 auth_pb2.SignupFlowReq( 

557 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

558 account=auth_pb2.SignupAccount( 

559 username="frodo", 

560 password="bad", 

561 city="Minas Tirith", 

562 birthdate="9999-12-31", # arbitrary future birthdate 

563 gender="Robot", 

564 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

565 lat=1, 

566 lng=1, 

567 radius=100, 

568 accept_tos=True, 

569 ), 

570 feedback=auth_pb2.ContributorForm(), 

571 ) 

572 ) 

573 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

574 assert e.value.details() == errors.PASSWORD_TOO_SHORT 

575 

576 

577def test_signup_invalid_birthdate(db): 

578 with auth_api_session() as (auth_api, metadata_interceptor): 

579 with pytest.raises(grpc.RpcError) as e: 

580 auth_api.SignupFlow( 

581 auth_pb2.SignupFlowReq( 

582 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

583 account=auth_pb2.SignupAccount( 

584 username="frodo", 

585 password="a very insecure password", 

586 city="Minas Tirith", 

587 birthdate="9999-12-31", # arbitrary future birthdate 

588 gender="Robot", 

589 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

590 lat=1, 

591 lng=1, 

592 radius=100, 

593 accept_tos=True, 

594 ), 

595 feedback=auth_pb2.ContributorForm(), 

596 ) 

597 ) 

598 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

599 assert e.value.details() == errors.INVALID_BIRTHDATE 

600 

601 res = auth_api.SignupFlow( 

602 auth_pb2.SignupFlowReq( 

603 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

604 account=auth_pb2.SignupAccount( 

605 username="ceelo", 

606 password="a very insecure password", 

607 city="New York City", 

608 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

609 gender="Helicopter", 

610 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

611 lat=1, 

612 lng=1, 

613 radius=100, 

614 accept_tos=True, 

615 ), 

616 feedback=auth_pb2.ContributorForm(), 

617 ) 

618 ) 

619 

620 assert res.flow_token 

621 

622 with pytest.raises(grpc.RpcError) as e: 

623 auth_api.SignupFlow( 

624 auth_pb2.SignupFlowReq( 

625 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

626 account=auth_pb2.SignupAccount( 

627 username="franklin", 

628 password="a very insecure password", 

629 city="Los Santos", 

630 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

631 gender="Male", 

632 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

633 lat=1, 

634 lng=1, 

635 radius=100, 

636 accept_tos=True, 

637 ), 

638 feedback=auth_pb2.ContributorForm(), 

639 ) 

640 ) 

641 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

642 assert e.value.details() == errors.INVALID_BIRTHDATE 

643 

644 with session_scope() as session: 

645 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

646 

647 

648def test_signup_invalid_email(db): 

649 with auth_api_session() as (auth_api, metadata_interceptor): 

650 with pytest.raises(grpc.RpcError) as e: 

651 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

652 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

653 assert e.value.details() == errors.INVALID_EMAIL 

654 

655 with auth_api_session() as (auth_api, metadata_interceptor): 

656 with pytest.raises(grpc.RpcError) as e: 

657 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

658 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

659 assert e.value.details() == errors.INVALID_EMAIL 

660 

661 with auth_api_session() as (auth_api, metadata_interceptor): 

662 with pytest.raises(grpc.RpcError) as e: 

663 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

664 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

665 assert e.value.details() == errors.INVALID_EMAIL 

666 

667 with auth_api_session() as (auth_api, metadata_interceptor): 

668 with pytest.raises(grpc.RpcError) as e: 

669 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

670 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

671 assert e.value.details() == errors.INVALID_EMAIL 

672 

673 

674def test_signup_existing_email(db): 

675 # Signed up user 

676 user, _ = generate_user() 

677 

678 with auth_api_session() as (auth_api, metadata_interceptor): 

679 with pytest.raises(grpc.RpcError) as e: 

680 reply = auth_api.SignupFlow( 

681 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email)) 

682 ) 

683 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

684 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_TAKEN 

685 

686 

687def test_signup_continue_with_email(db): 

688 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

689 with auth_api_session() as (auth_api, metadata_interceptor): 

690 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

691 flow_token = res.flow_token 

692 assert flow_token 

693 

694 # continue with same email, should just send another email to the user 

695 with auth_api_session() as (auth_api, metadata_interceptor): 

696 with pytest.raises(grpc.RpcError) as e: 

697 res = auth_api.SignupFlow( 

698 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

699 ) 

700 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

701 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

702 

703 

704def test_signup_resend_email(db): 

705 with auth_api_session() as (auth_api, metadata_interceptor): 

706 with mock_notification_email() as mock: 

707 res = auth_api.SignupFlow( 

708 auth_pb2.SignupFlowReq( 

709 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

710 account=auth_pb2.SignupAccount( 

711 username="frodo", 

712 password="a very insecure password", 

713 birthdate="1970-01-01", 

714 gender="Bot", 

715 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

716 city="New York City", 

717 lat=40.7331, 

718 lng=-73.9778, 

719 radius=500, 

720 accept_tos=True, 

721 ), 

722 feedback=auth_pb2.ContributorForm(), 

723 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

724 ) 

725 ) 

726 assert mock.call_count == 1 

727 e = email_fields(mock) 

728 assert e.recipient == "email@couchers.org.invalid" 

729 

730 flow_token = res.flow_token 

731 assert flow_token 

732 

733 with session_scope() as session: 

734 flow = session.execute(select(SignupFlow)).scalar_one() 

735 assert flow.flow_token == flow_token 

736 assert flow.email_sent 

737 assert not flow.email_verified 

738 email_token = flow.email_token 

739 

740 # ask for a new signup email 

741 with auth_api_session() as (auth_api, metadata_interceptor): 

742 with mock_notification_email() as mock: 

743 res = auth_api.SignupFlow( 

744 auth_pb2.SignupFlowReq( 

745 flow_token=flow_token, 

746 resend_verification_email=True, 

747 ) 

748 ) 

749 assert mock.call_count == 1 

750 e = email_fields(mock) 

751 assert e.recipient == "email@couchers.org.invalid" 

752 assert email_token in e.plain 

753 assert email_token in e.html 

754 

755 with session_scope() as session: 

756 flow = session.execute(select(SignupFlow)).scalar_one() 

757 assert not flow.email_verified 

758 

759 with auth_api_session() as (auth_api, metadata_interceptor): 

760 res = auth_api.SignupFlow( 

761 auth_pb2.SignupFlowReq( 

762 email_token=email_token, 

763 ) 

764 ) 

765 

766 assert not res.flow_token 

767 assert res.HasField("auth_res") 

768 

769 

770def test_successful_authenticate(db): 

771 user, _ = generate_user(hashed_password=hash_password("password")) 

772 

773 # Authenticate with username 

774 with auth_api_session() as (auth_api, metadata_interceptor): 

775 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

776 assert not reply.jailed 

777 

778 # Authenticate with email 

779 with auth_api_session() as (auth_api, metadata_interceptor): 

780 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

781 assert not reply.jailed 

782 

783 

784def test_unsuccessful_authenticate(db): 

785 user, _ = generate_user(hashed_password=hash_password("password")) 

786 

787 # Invalid password 

788 with auth_api_session() as (auth_api, metadata_interceptor): 

789 with pytest.raises(grpc.RpcError) as e: 

790 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

791 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

792 assert e.value.details() == errors.INVALID_PASSWORD 

793 

794 # Invalid username 

795 with auth_api_session() as (auth_api, metadata_interceptor): 

796 with pytest.raises(grpc.RpcError) as e: 

797 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

798 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

799 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

800 

801 # Invalid email 

802 with auth_api_session() as (auth_api, metadata_interceptor): 

803 with pytest.raises(grpc.RpcError) as e: 

804 reply = auth_api.Authenticate( 

805 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

806 ) 

807 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

808 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

809 

810 # Invalid id 

811 with auth_api_session() as (auth_api, metadata_interceptor): 

812 with pytest.raises(grpc.RpcError) as e: 

813 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

814 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

815 assert e.value.details() == errors.ACCOUNT_NOT_FOUND 

816 

817 

818def test_complete_signup(db): 

819 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

820 with auth_api_session() as (auth_api, metadata_interceptor): 

821 reply = auth_api.SignupFlow( 

822 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

823 ) 

824 

825 flow_token = reply.flow_token 

826 

827 with auth_api_session() as (auth_api, metadata_interceptor): 

828 # Invalid username 

829 with pytest.raises(grpc.RpcError) as e: 

830 reply = auth_api.SignupFlow( 

831 auth_pb2.SignupFlowReq( 

832 flow_token=flow_token, 

833 account=auth_pb2.SignupAccount( 

834 username=" ", 

835 password="a very insecure password", 

836 city="Minas Tirith", 

837 birthdate="1980-12-31", 

838 gender="Robot", 

839 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

840 lat=1, 

841 lng=1, 

842 radius=100, 

843 accept_tos=True, 

844 ), 

845 ) 

846 ) 

847 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

848 assert e.value.details() == errors.INVALID_USERNAME 

849 

850 with auth_api_session() as (auth_api, metadata_interceptor): 

851 # Invalid name 

852 with pytest.raises(grpc.RpcError) as e: 

853 reply = auth_api.SignupFlow( 

854 auth_pb2.SignupFlowReq( 

855 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

856 ) 

857 ) 

858 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

859 assert e.value.details() == errors.INVALID_NAME 

860 

861 with auth_api_session() as (auth_api, metadata_interceptor): 

862 # Hosting status required 

863 with pytest.raises(grpc.RpcError) as e: 

864 reply = auth_api.SignupFlow( 

865 auth_pb2.SignupFlowReq( 

866 flow_token=flow_token, 

867 account=auth_pb2.SignupAccount( 

868 username="frodo", 

869 password="a very insecure password", 

870 city="Minas Tirith", 

871 birthdate="1980-12-31", 

872 gender="Robot", 

873 hosting_status=None, 

874 lat=1, 

875 lng=1, 

876 radius=100, 

877 accept_tos=True, 

878 ), 

879 ) 

880 ) 

881 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

882 assert e.value.details() == errors.HOSTING_STATUS_REQUIRED 

883 

884 user, _ = generate_user() 

885 with auth_api_session() as (auth_api, metadata_interceptor): 

886 # Username unavailable 

887 with pytest.raises(grpc.RpcError) as e: 

888 reply = auth_api.SignupFlow( 

889 auth_pb2.SignupFlowReq( 

890 flow_token=flow_token, 

891 account=auth_pb2.SignupAccount( 

892 username=user.username, 

893 password="a very insecure password", 

894 city="Minas Tirith", 

895 birthdate="1980-12-31", 

896 gender="Robot", 

897 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

898 lat=1, 

899 lng=1, 

900 radius=100, 

901 accept_tos=True, 

902 ), 

903 ) 

904 ) 

905 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

906 assert e.value.details() == errors.USERNAME_NOT_AVAILABLE 

907 

908 with auth_api_session() as (auth_api, metadata_interceptor): 

909 # Invalid coordinate 

910 with pytest.raises(grpc.RpcError) as e: 

911 reply = auth_api.SignupFlow( 

912 auth_pb2.SignupFlowReq( 

913 flow_token=flow_token, 

914 account=auth_pb2.SignupAccount( 

915 username="frodo", 

916 password="a very insecure password", 

917 city="Minas Tirith", 

918 birthdate="1980-12-31", 

919 gender="Robot", 

920 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

921 lat=0, 

922 lng=0, 

923 radius=100, 

924 accept_tos=True, 

925 ), 

926 ) 

927 ) 

928 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

929 assert e.value.details() == errors.INVALID_COORDINATE 

930 

931 

932def test_signup_token_regression(db): 

933 # Repro steps: 

934 # 1. Start a signup 

935 # 2. Confirm the email 

936 # 3. Start a new signup with the same email 

937 # Expected: send a link to the email to continue signing up. 

938 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

939 

940 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

941 

942 # 1. Start a signup 

943 with auth_api_session() as (auth_api, metadata_interceptor): 

944 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

945 flow_token = res.flow_token 

946 assert flow_token 

947 

948 # 2. Confirm the email 

949 with session_scope() as session: 

950 email_token = ( 

951 session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one().email_token 

952 ) 

953 

954 with auth_api_session() as (auth_api, metadata_interceptor): 

955 res = auth_api.SignupFlow( 

956 auth_pb2.SignupFlowReq( 

957 flow_token=flow_token, 

958 email_token=email_token, 

959 ) 

960 ) 

961 

962 # 3. Start a new signup with the same email 

963 with auth_api_session() as (auth_api, metadata_interceptor): 

964 with pytest.raises(grpc.RpcError) as e: 

965 res = auth_api.SignupFlow( 

966 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

967 ) 

968 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

969 assert e.value.details() == errors.SIGNUP_FLOW_EMAIL_STARTED_SIGNUP 

970 

971 

972@pytest.mark.parametrize("opt_out", [True, False]) 

973def test_opt_out_of_newsletter(db, opt_out): 

974 with auth_api_session() as (auth_api, metadata_interceptor): 

975 res = auth_api.SignupFlow( 

976 auth_pb2.SignupFlowReq( 

977 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

978 account=auth_pb2.SignupAccount( 

979 username="frodo", 

980 password="a very insecure password", 

981 birthdate="1970-01-01", 

982 gender="Bot", 

983 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

984 city="New York City", 

985 lat=40.7331, 

986 lng=-73.9778, 

987 radius=500, 

988 accept_tos=True, 

989 opt_out_of_newsletter=opt_out, 

990 ), 

991 feedback=auth_pb2.ContributorForm(), 

992 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

993 ) 

994 ) 

995 

996 with session_scope() as session: 

997 email_token = ( 

998 session.execute(select(SignupFlow).where(SignupFlow.flow_token == res.flow_token)).scalar_one().email_token 

999 ) 

1000 

1001 with auth_api_session() as (auth_api, metadata_interceptor): 

1002 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1003 

1004 user_id = res.auth_res.user_id 

1005 

1006 with session_scope() as session: 

1007 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1008 assert not user.in_sync_with_newsletter 

1009 assert user.opt_out_of_newsletter == opt_out 

1010 

1011 

1012def test_GetAuthState(db): 

1013 user, token = generate_user() 

1014 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1015 

1016 with auth_api_session() as (auth_api, metadata_interceptor): 

1017 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1018 assert not res.logged_in 

1019 assert not res.HasField("auth_res") 

1020 

1021 with auth_api_session() as (auth_api, metadata_interceptor): 

1022 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1023 assert res.logged_in 

1024 assert res.HasField("auth_res") 

1025 assert res.auth_res.user_id == user.id 

1026 assert not res.auth_res.jailed 

1027 

1028 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1029 

1030 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1031 assert not res.logged_in 

1032 assert not res.HasField("auth_res") 

1033 

1034 with auth_api_session() as (auth_api, metadata_interceptor): 

1035 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1036 assert res.logged_in 

1037 assert res.HasField("auth_res") 

1038 assert res.auth_res.user_id == jailed_user.id 

1039 assert res.auth_res.jailed 

1040 

1041 

1042# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_*