Coverage for app / backend / src / tests / test_auth.py: 100%

701 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import http.cookies 

2from typing import cast 

3 

4import grpc 

5import pytest 

6from google.protobuf import empty_pb2, wrappers_pb2 

7from sqlalchemy import select, update 

8from sqlalchemy.sql import delete, func 

9 

10from couchers import urls 

11from couchers.crypto import hash_password, random_hex 

12from couchers.db import session_scope 

13from couchers.models import ( 

14 ContributeOption, 

15 ContributorForm, 

16 LoginToken, 

17 PasswordResetToken, 

18 SignupFlow, 

19 User, 

20 UserSession, 

21) 

22from couchers.proto import account_pb2, api_pb2, auth_pb2 

23from couchers.utils import now 

24from tests.fixtures.db import generate_user 

25from tests.fixtures.misc import PushCollector, email_fields, mock_notification_email 

26from tests.fixtures.sessions import ( 

27 MetadataKeeperInterceptor, 

28 account_session, 

29 api_session, 

30 auth_api_session, 

31 real_api_session, 

32) 

33 

34 

35@pytest.fixture(autouse=True) 

36def _(testconfig, fast_passwords): 

37 pass 

38 

39 

40def get_session_cookie_tokens(metadata_interceptor: MetadataKeeperInterceptor) -> tuple[str, str]: 

41 set_cookies = [val for key, val in metadata_interceptor.latest_header_raw if key == "set-cookie"] 

42 sesh = http.cookies.SimpleCookie([v for v in set_cookies if "sesh" in v][0])["couchers-sesh"].value 

43 uid = http.cookies.SimpleCookie([v for v in set_cookies if "user-id" in v][0])["couchers-user-id"].value 

44 return sesh, uid 

45 

46 

47def test_UsernameValid(db): 

48 with auth_api_session() as (auth_api, metadata_interceptor): 

49 assert auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="test")).valid 

50 

51 with auth_api_session() as (auth_api, metadata_interceptor): 

52 assert not auth_api.UsernameValid(auth_pb2.UsernameValidReq(username="")).valid 

53 

54 

55def test_signup_incremental(db): 

56 with auth_api_session() as (auth_api, metadata_interceptor): 

57 res = auth_api.SignupFlow( 

58 auth_pb2.SignupFlowReq( 

59 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

60 ) 

61 ) 

62 

63 flow_token = res.flow_token 

64 assert res.flow_token 

65 assert not res.HasField("auth_res") 

66 assert not res.need_basic 

67 assert res.need_account 

68 assert not res.need_feedback 

69 assert res.need_verify_email 

70 assert res.need_accept_community_guidelines 

71 assert res.need_motivations 

72 

73 # read out the signup token directly from the database for now 

74 with session_scope() as session: 

75 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

76 assert flow.email_sent 

77 assert not flow.email_verified 

78 email_token = flow.email_token 

79 

80 with auth_api_session() as (auth_api, metadata_interceptor): 

81 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(flow_token=flow_token)) 

82 

83 assert res.flow_token == flow_token 

84 assert not res.HasField("auth_res") 

85 assert not res.need_basic 

86 assert res.need_account 

87 assert not res.need_feedback 

88 assert res.need_verify_email 

89 assert res.need_accept_community_guidelines 

90 assert res.need_motivations 

91 

92 # Add feedback 

93 with auth_api_session() as (auth_api, metadata_interceptor): 

94 res = auth_api.SignupFlow( 

95 auth_pb2.SignupFlowReq( 

96 flow_token=flow_token, 

97 feedback=auth_pb2.ContributorForm( 

98 ideas="I'm a robot, incapable of original ideation", 

99 features="I love all your features", 

100 experience="I haven't done couch surfing before", 

101 contribute=auth_pb2.CONTRIBUTE_OPTION_YES, 

102 contribute_ways=["serving", "backend"], 

103 expertise="I'd love to be your server: I can compute very fast, but only simple opcodes", 

104 ), 

105 ) 

106 ) 

107 

108 assert res.flow_token == flow_token 

109 assert not res.HasField("auth_res") 

110 assert not res.need_basic 

111 assert res.need_account 

112 assert not res.need_feedback 

113 assert res.need_verify_email 

114 assert res.need_accept_community_guidelines 

115 assert res.need_motivations 

116 

117 # Agree to community guidelines 

118 with auth_api_session() as (auth_api, metadata_interceptor): 

119 res = auth_api.SignupFlow( 

120 auth_pb2.SignupFlowReq( 

121 flow_token=flow_token, 

122 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

123 ) 

124 ) 

125 

126 assert res.flow_token == flow_token 

127 assert not res.HasField("auth_res") 

128 assert not res.need_basic 

129 assert res.need_account 

130 assert not res.need_feedback 

131 assert res.need_verify_email 

132 assert not res.need_accept_community_guidelines 

133 assert res.need_motivations 

134 

135 # Submit motivations 

136 with auth_api_session() as (auth_api, metadata_interceptor): 

137 res = auth_api.SignupFlow( 

138 auth_pb2.SignupFlowReq( 

139 flow_token=flow_token, 

140 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

141 ) 

142 ) 

143 

144 assert res.flow_token == flow_token 

145 assert not res.HasField("auth_res") 

146 assert not res.need_basic 

147 assert res.need_account 

148 assert not res.need_feedback 

149 assert res.need_verify_email 

150 assert not res.need_accept_community_guidelines 

151 assert not res.need_motivations 

152 

153 # Verify email 

154 with auth_api_session() as (auth_api, metadata_interceptor): 

155 res = auth_api.SignupFlow( 

156 auth_pb2.SignupFlowReq( 

157 flow_token=flow_token, 

158 email_token=email_token, 

159 ) 

160 ) 

161 

162 assert res.flow_token == flow_token 

163 assert not res.HasField("auth_res") 

164 assert not res.need_basic 

165 assert res.need_account 

166 assert not res.need_feedback 

167 assert not res.need_verify_email 

168 assert not res.need_accept_community_guidelines 

169 assert not res.need_motivations 

170 

171 # Finally finish off account info 

172 with auth_api_session() as (auth_api, metadata_interceptor): 

173 res = auth_api.SignupFlow( 

174 auth_pb2.SignupFlowReq( 

175 flow_token=flow_token, 

176 account=auth_pb2.SignupAccount( 

177 username="frodo", 

178 password="a very insecure password", 

179 birthdate="1970-01-01", 

180 gender="Bot", 

181 hosting_status=api_pb2.HOSTING_STATUS_MAYBE, 

182 city="New York City", 

183 lat=40.7331, 

184 lng=-73.9778, 

185 radius=500, 

186 accept_tos=True, 

187 ), 

188 ) 

189 ) 

190 

191 assert not res.flow_token 

192 assert res.HasField("auth_res") 

193 assert res.auth_res.user_id 

194 assert not res.auth_res.jailed 

195 assert not res.need_basic 

196 assert not res.need_account 

197 assert not res.need_feedback 

198 assert not res.need_verify_email 

199 assert not res.need_accept_community_guidelines 

200 assert not res.need_motivations 

201 

202 user_id = res.auth_res.user_id 

203 

204 sess_token, uid = get_session_cookie_tokens(metadata_interceptor) 

205 assert uid == str(user_id) 

206 

207 with api_session(sess_token) as api: 

208 res = api.GetUser(api_pb2.GetUserReq(user=str(user_id))) 

209 

210 assert res.username == "frodo" 

211 assert res.gender == "Bot" 

212 assert res.hosting_status == api_pb2.HOSTING_STATUS_MAYBE 

213 assert res.city == "New York City" 

214 assert res.lat == 40.7331 

215 assert res.lng == -73.9778 

216 assert res.radius == 500 

217 

218 with session_scope() as session: 

219 form = session.execute(select(ContributorForm)).scalar_one() 

220 

221 assert form.ideas == "I'm a robot, incapable of original ideation" 

222 assert form.features == "I love all your features" 

223 assert form.experience == "I haven't done couch surfing before" 

224 assert form.contribute == ContributeOption.yes 

225 assert form.contribute_ways == ["serving", "backend"] 

226 assert form.expertise == "I'd love to be your server: I can compute very fast, but only simple opcodes" 

227 

228 

229def _quick_signup() -> int: 

230 with auth_api_session() as (auth_api, metadata_interceptor): 

231 res = auth_api.SignupFlow( 

232 auth_pb2.SignupFlowReq( 

233 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

234 account=auth_pb2.SignupAccount( 

235 username="frodo", 

236 password="a very insecure password", 

237 birthdate="1970-01-01", 

238 gender="Bot", 

239 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

240 city="New York City", 

241 lat=40.7331, 

242 lng=-73.9778, 

243 radius=500, 

244 accept_tos=True, 

245 ), 

246 feedback=auth_pb2.ContributorForm(), 

247 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

248 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

249 ) 

250 ) 

251 

252 flow_token = res.flow_token 

253 

254 assert res.flow_token 

255 assert not res.HasField("auth_res") 

256 assert not res.need_basic 

257 assert not res.need_account 

258 assert not res.need_feedback 

259 assert not res.need_motivations 

260 assert res.need_verify_email 

261 

262 # read out the signup token directly from the database for now 

263 with session_scope() as session: 

264 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

265 assert flow.email_sent 

266 assert not flow.email_verified 

267 email_token = flow.email_token 

268 

269 with auth_api_session() as (auth_api, metadata_interceptor): 

270 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

271 

272 assert not res.flow_token 

273 assert res.HasField("auth_res") 

274 assert res.auth_res.user_id 

275 assert not res.auth_res.jailed 

276 assert not res.need_basic 

277 assert not res.need_account 

278 assert not res.need_feedback 

279 assert not res.need_motivations 

280 assert not res.need_verify_email 

281 

282 # make sure we got the right token in a cookie 

283 with session_scope() as session: 

284 token = session.execute( 

285 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

286 ).scalar_one() 

287 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

288 assert sesh == token 

289 

290 return cast(int, res.auth_res.user_id) 

291 

292 

293def test_signup(db): 

294 _quick_signup() 

295 

296 

297def test_basic_login(db): 

298 # Create our test user using signup 

299 _quick_signup() 

300 

301 with auth_api_session() as (auth_api, metadata_interceptor): 

302 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

303 

304 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

305 

306 with session_scope() as session: 

307 token = session.execute( 

308 select(UserSession.token) 

309 .join(User, UserSession.user_id == User.id) 

310 .where(User.username == "frodo") 

311 .where(UserSession.token == reply_token) 

312 .where(UserSession.is_valid) 

313 ).scalar_one_or_none() 

314 assert token 

315 

316 # log out 

317 with auth_api_session() as (auth_api, metadata_interceptor): 

318 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

319 

320 

321def test_login_part_signed_up_verified_email(db): 

322 """ 

323 If you try to log in but didn't finish singing up, we send you a new email and ask you to finish signing up. 

324 """ 

325 with auth_api_session() as (auth_api, metadata_interceptor): 

326 res = auth_api.SignupFlow( 

327 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid")) 

328 ) 

329 

330 flow_token = res.flow_token 

331 assert res.need_verify_email 

332 

333 # verify the email 

334 with session_scope() as session: 

335 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

336 flow_token = flow.flow_token 

337 email_token = flow.email_token 

338 with auth_api_session() as (auth_api, metadata_interceptor): 

339 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

340 

341 with mock_notification_email() as mock: 

342 with auth_api_session() as (auth_api, metadata_interceptor): 

343 with pytest.raises(grpc.RpcError) as err: 

344 auth_api.Authenticate(auth_pb2.AuthReq(user="email@couchers.org.invalid", password="wrong pwd")) 

345 assert err.value.details() == "Please check your email for a link to continue signing up." 

346 

347 assert mock.call_count == 1 

348 e = email_fields(mock) 

349 assert e.recipient == "email@couchers.org.invalid" 

350 assert flow_token in e.plain 

351 assert flow_token in e.html 

352 

353 

354def test_login_part_signed_up_not_verified_email(db): 

355 with auth_api_session() as (auth_api, metadata_interceptor): 

356 res = auth_api.SignupFlow( 

357 auth_pb2.SignupFlowReq( 

358 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

359 account=auth_pb2.SignupAccount( 

360 username="frodo", 

361 password="a very insecure password", 

362 birthdate="1999-01-01", 

363 gender="Bot", 

364 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

365 city="New York City", 

366 lat=40.7331, 

367 lng=-73.9778, 

368 radius=500, 

369 accept_tos=True, 

370 ), 

371 ) 

372 ) 

373 

374 flow_token = res.flow_token 

375 assert res.need_verify_email 

376 

377 with mock_notification_email() as mock: 

378 with auth_api_session() as (auth_api, metadata_interceptor): 

379 with pytest.raises(grpc.RpcError) as err: 

380 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="wrong pwd")) 

381 assert err.value.details() == "Please check your email for a link to continue signing up." 

382 

383 with session_scope() as session: 

384 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

385 email_token = flow.email_token 

386 

387 assert mock.call_count == 1 

388 e = email_fields(mock) 

389 assert e.recipient == "email@couchers.org.invalid" 

390 assert email_token 

391 assert email_token in e.plain 

392 assert email_token in e.html 

393 

394 

395def test_banned_user(db): 

396 _quick_signup() 

397 

398 with session_scope() as session: 

399 session.execute(select(User)).scalar_one().banned_at = now() 

400 

401 with auth_api_session() as (auth_api, metadata_interceptor): 

402 with pytest.raises(grpc.RpcError) as e: 

403 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

404 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

405 assert e.value.details() == "Your account is suspended." 

406 

407 

408def test_deleted_user(db): 

409 user_id = _quick_signup() 

410 

411 with session_scope() as session: 

412 session.execute(update(User).where(User.id == user_id).values(deleted_at=func.now())) 

413 

414 with auth_api_session() as (auth_api, metadata_interceptor): 

415 with pytest.raises(grpc.RpcError) as e: 

416 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

417 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

418 assert e.value.details() == "An account with that username or email was not found." 

419 

420 

421def test_invalid_token(db): 

422 user1, token1 = generate_user() 

423 user2, token2 = generate_user() 

424 

425 wrong_token = random_hex(32) 

426 

427 with real_api_session(wrong_token) as api, pytest.raises(grpc.RpcError) as e: 

428 res = api.GetUser(api_pb2.GetUserReq(user=user2.username)) 

429 

430 assert e.value.code() == grpc.StatusCode.UNAUTHENTICATED 

431 assert e.value.details() == "Unauthorized" 

432 

433 

434def test_password_reset_v2(db, push_collector: PushCollector): 

435 user, token = generate_user(hashed_password=hash_password("mypassword")) 

436 

437 with auth_api_session() as (auth_api, metadata_interceptor): 

438 with mock_notification_email() as mock: 

439 res = auth_api.ResetPassword(auth_pb2.ResetPasswordReq(user=user.username)) 

440 

441 with session_scope() as session: 

442 password_reset_token = session.execute(select(PasswordResetToken.token)).scalar_one() 

443 

444 assert mock.call_count == 1 

445 e = email_fields(mock) 

446 assert e.recipient == user.email 

447 assert "reset" in e.subject.lower() 

448 assert password_reset_token in e.plain 

449 assert password_reset_token in e.html 

450 unique_string = "You asked for your password to be reset on Couchers.org." 

451 assert unique_string in e.plain 

452 assert unique_string in e.html 

453 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.plain 

454 assert f"http://localhost:3000/complete-password-reset?token={password_reset_token}" in e.html 

455 assert "support@couchers.org" in e.plain 

456 assert "support@couchers.org" in e.html 

457 

458 push = push_collector.pop_for_user(user.id, last=True) 

459 assert push.content.title == "Password reset requested" 

460 assert push.content.body == "Use the link we sent by email to complete it." 

461 

462 # make sure bad password are caught 

463 with auth_api_session() as (auth_api, metadata_interceptor): 

464 with pytest.raises(grpc.RpcError) as err: 

465 auth_api.CompletePasswordResetV2( 

466 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password="password") 

467 ) 

468 assert err.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

469 assert err.value.details() == "The password is insecure. Please use one that is not easily guessable." 

470 

471 # make sure we can set a good password 

472 with auth_api_session() as (auth_api, metadata_interceptor): 

473 pwd = random_hex() 

474 with mock_notification_email() as mock: 

475 auth_api.CompletePasswordResetV2( 

476 auth_pb2.CompletePasswordResetV2Req(password_reset_token=password_reset_token, new_password=pwd) 

477 ) 

478 

479 push = push_collector.pop_for_user(user.id, last=True) 

480 assert push.content.title == "Password reset" 

481 assert push.content.body == "Your password was successfully reset." 

482 

483 session_token, _ = get_session_cookie_tokens(metadata_interceptor) 

484 

485 with session_scope() as session: 

486 other_session_token = session.execute( 

487 select(UserSession.token) 

488 .join(User, UserSession.user_id == User.id) 

489 .where(User.username == user.username) 

490 .where(UserSession.token == session_token) 

491 .where(UserSession.is_valid) 

492 ).scalar_one_or_none() 

493 assert other_session_token 

494 

495 # make sure we can't set a password again 

496 with auth_api_session() as (auth_api, metadata_interceptor): 

497 with pytest.raises(grpc.RpcError) as err: 

498 auth_api.CompletePasswordResetV2( 

499 auth_pb2.CompletePasswordResetV2Req( 

500 password_reset_token=password_reset_token, new_password=random_hex() 

501 ) 

502 ) 

503 assert err.value.code() == grpc.StatusCode.NOT_FOUND 

504 assert err.value.details() == "Invalid token." 

505 

506 with session_scope() as session: 

507 user = session.execute(select(User)).scalar_one() 

508 assert user.hashed_password == hash_password(pwd) 

509 

510 

511def test_password_reset_no_such_user(db): 

512 user, token = generate_user() 

513 

514 with auth_api_session() as (auth_api, metadata_interceptor): 

515 res = auth_api.ResetPassword( 

516 auth_pb2.ResetPasswordReq( 

517 user="nonexistentuser", 

518 ) 

519 ) 

520 

521 with session_scope() as session: 

522 assert session.execute(select(PasswordResetToken)).scalar_one_or_none() is None 

523 

524 

525def test_password_reset_invalid_token_v2(db): 

526 password = random_hex() 

527 user, token = generate_user(hashed_password=hash_password(password)) 

528 

529 with auth_api_session() as (auth_api, metadata_interceptor): 

530 res = auth_api.ResetPassword( 

531 auth_pb2.ResetPasswordReq( 

532 user=user.username, 

533 ) 

534 ) 

535 

536 with auth_api_session() as (auth_api, metadata_interceptor), pytest.raises(grpc.RpcError) as e: 

537 res = auth_api.CompletePasswordResetV2(auth_pb2.CompletePasswordResetV2Req(password_reset_token="wrongtoken")) 

538 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

539 assert e.value.details() == "Invalid token." 

540 

541 with session_scope() as session: 

542 user = session.execute(select(User)).scalar_one() 

543 assert user.hashed_password == hash_password(password) 

544 

545 

546def test_logout_invalid_token(db): 

547 # Create our test user using signup 

548 _quick_signup() 

549 

550 with auth_api_session() as (auth_api, metadata_interceptor): 

551 auth_api.Authenticate(auth_pb2.AuthReq(user="frodo", password="a very insecure password")) 

552 

553 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

554 

555 # delete all login tokens 

556 with session_scope() as session: 

557 session.execute(delete(LoginToken)) 

558 

559 # log out with non-existent token should still return a valid result 

560 with auth_api_session() as (auth_api, metadata_interceptor): 

561 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={reply_token}"),)) 

562 

563 reply_token, _ = get_session_cookie_tokens(metadata_interceptor) 

564 # make sure we set an empty cookie 

565 assert reply_token == "" 

566 

567 

568def test_signup_without_password(db): 

569 with auth_api_session() as (auth_api, metadata_interceptor): 

570 with pytest.raises(grpc.RpcError) as e: 

571 auth_api.SignupFlow( 

572 auth_pb2.SignupFlowReq( 

573 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

574 account=auth_pb2.SignupAccount( 

575 username="frodo", 

576 password="bad", 

577 city="Minas Tirith", 

578 birthdate="9999-12-31", # arbitrary future birthdate 

579 gender="Robot", 

580 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

581 lat=1, 

582 lng=1, 

583 radius=100, 

584 accept_tos=True, 

585 ), 

586 feedback=auth_pb2.ContributorForm(), 

587 ) 

588 ) 

589 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

590 assert e.value.details() == "The password must be 8 or more characters long." 

591 

592 

593def test_signup_invalid_birthdate(db): 

594 with auth_api_session() as (auth_api, metadata_interceptor): 

595 with pytest.raises(grpc.RpcError) as e: 

596 auth_api.SignupFlow( 

597 auth_pb2.SignupFlowReq( 

598 basic=auth_pb2.SignupBasic(name="Räksmörgås", email="a1@b.com"), 

599 account=auth_pb2.SignupAccount( 

600 username="frodo", 

601 password="a very insecure password", 

602 city="Minas Tirith", 

603 birthdate="9999-12-31", # arbitrary future birthdate 

604 gender="Robot", 

605 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

606 lat=1, 

607 lng=1, 

608 radius=100, 

609 accept_tos=True, 

610 ), 

611 feedback=auth_pb2.ContributorForm(), 

612 ) 

613 ) 

614 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

615 assert e.value.details() == "You must be at least 18 years old to sign up." 

616 

617 res = auth_api.SignupFlow( 

618 auth_pb2.SignupFlowReq( 

619 basic=auth_pb2.SignupBasic(name="Christopher", email="a2@b.com"), 

620 account=auth_pb2.SignupAccount( 

621 username="ceelo", 

622 password="a very insecure password", 

623 city="New York City", 

624 birthdate="2000-12-31", # arbitrary birthdate older than 18 years 

625 gender="Helicopter", 

626 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

627 lat=1, 

628 lng=1, 

629 radius=100, 

630 accept_tos=True, 

631 ), 

632 feedback=auth_pb2.ContributorForm(), 

633 ) 

634 ) 

635 

636 assert res.flow_token 

637 

638 with pytest.raises(grpc.RpcError) as e: 

639 auth_api.SignupFlow( 

640 auth_pb2.SignupFlowReq( 

641 basic=auth_pb2.SignupBasic(name="Franklin", email="a3@b.com"), 

642 account=auth_pb2.SignupAccount( 

643 username="franklin", 

644 password="a very insecure password", 

645 city="Los Santos", 

646 birthdate="2010-04-09", # arbitrary birthdate < 18 yrs 

647 gender="Male", 

648 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

649 lat=1, 

650 lng=1, 

651 radius=100, 

652 accept_tos=True, 

653 ), 

654 feedback=auth_pb2.ContributorForm(), 

655 ) 

656 ) 

657 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

658 assert e.value.details() == "You must be at least 18 years old to sign up." 

659 

660 with session_scope() as session: 

661 assert session.execute(select(func.count()).select_from(SignupFlow)).scalar_one() == 1 

662 

663 

664def test_signup_invalid_email(db): 

665 with auth_api_session() as (auth_api, metadata_interceptor): 

666 with pytest.raises(grpc.RpcError) as e: 

667 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a"))) 

668 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

669 assert e.value.details() == "Invalid email." 

670 

671 with auth_api_session() as (auth_api, metadata_interceptor): 

672 with pytest.raises(grpc.RpcError) as e: 

673 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b"))) 

674 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

675 assert e.value.details() == "Invalid email." 

676 

677 with auth_api_session() as (auth_api, metadata_interceptor): 

678 with pytest.raises(grpc.RpcError) as e: 

679 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b."))) 

680 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

681 assert e.value.details() == "Invalid email." 

682 

683 with auth_api_session() as (auth_api, metadata_interceptor): 

684 with pytest.raises(grpc.RpcError) as e: 

685 reply = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email="a@b.c"))) 

686 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

687 assert e.value.details() == "Invalid email." 

688 

689 

690def test_signup_existing_email(db): 

691 # Signed up user 

692 user, _ = generate_user() 

693 

694 with auth_api_session() as (auth_api, metadata_interceptor): 

695 with pytest.raises(grpc.RpcError) as e: 

696 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=user.email))) 

697 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

698 assert e.value.details() == "That email address is already associated with an account. Please log in instead!" 

699 

700 

701def test_signup_banned_user_email(db): 

702 user, _ = generate_user() 

703 

704 with session_scope() as session: 

705 session.execute(update(User).where(User.id == user.id).values(banned_at=func.now())) 

706 

707 with auth_api_session() as (auth_api, _): 

708 with pytest.raises(grpc.RpcError) as e: 

709 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

710 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

711 assert e.value.details() == "You cannot sign up with that email address." 

712 

713 

714def test_signup_deleted_user_email(db): 

715 user, _ = generate_user() 

716 

717 with session_scope() as session: 

718 session.execute(update(User).where(User.id == user.id).values(deleted_at=func.now())) 

719 

720 with auth_api_session() as (auth_api, _): 

721 with pytest.raises(grpc.RpcError) as e: 

722 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="NewName", email=user.email))) 

723 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

724 assert e.value.details() == "You cannot sign up with that email address." 

725 

726 

727def test_signup_continue_with_email(db): 

728 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

729 with auth_api_session() as (auth_api, metadata_interceptor): 

730 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

731 flow_token = res.flow_token 

732 assert flow_token 

733 

734 # continue with same email, should just send another email to the user 

735 with auth_api_session() as (auth_api, metadata_interceptor): 

736 with pytest.raises(grpc.RpcError) as e: 

737 res = auth_api.SignupFlow( 

738 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email)) 

739 ) 

740 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

741 assert e.value.details() == "Please check your email for a link to continue signing up." 

742 

743 

744def test_signup_resend_email(db): 

745 with auth_api_session() as (auth_api, metadata_interceptor): 

746 with mock_notification_email() as mock: 

747 res = auth_api.SignupFlow( 

748 auth_pb2.SignupFlowReq( 

749 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

750 account=auth_pb2.SignupAccount( 

751 username="frodo", 

752 password="a very insecure password", 

753 birthdate="1970-01-01", 

754 gender="Bot", 

755 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

756 city="New York City", 

757 lat=40.7331, 

758 lng=-73.9778, 

759 radius=500, 

760 accept_tos=True, 

761 ), 

762 feedback=auth_pb2.ContributorForm(), 

763 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

764 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

765 ) 

766 ) 

767 assert mock.call_count == 1 

768 e = email_fields(mock) 

769 assert e.recipient == "email@couchers.org.invalid" 

770 

771 flow_token = res.flow_token 

772 assert flow_token 

773 

774 with session_scope() as session: 

775 flow = session.execute(select(SignupFlow)).scalar_one() 

776 assert flow.flow_token == flow_token 

777 assert flow.email_sent 

778 assert not flow.email_verified 

779 email_token = flow.email_token 

780 

781 # ask for a new signup email 

782 with auth_api_session() as (auth_api, metadata_interceptor): 

783 with mock_notification_email() as mock: 

784 res = auth_api.SignupFlow( 

785 auth_pb2.SignupFlowReq( 

786 flow_token=flow_token, 

787 resend_verification_email=True, 

788 ) 

789 ) 

790 assert mock.call_count == 1 

791 e = email_fields(mock) 

792 assert e.recipient == "email@couchers.org.invalid" 

793 assert email_token 

794 assert email_token in e.plain 

795 assert email_token in e.html 

796 

797 with session_scope() as session: 

798 flow = session.execute(select(SignupFlow)).scalar_one() 

799 assert not flow.email_verified 

800 

801 with auth_api_session() as (auth_api, metadata_interceptor): 

802 res = auth_api.SignupFlow( 

803 auth_pb2.SignupFlowReq( 

804 email_token=email_token, 

805 ) 

806 ) 

807 

808 assert not res.flow_token 

809 assert res.HasField("auth_res") 

810 

811 

812def test_successful_authenticate(db): 

813 user, _ = generate_user(hashed_password=hash_password("password")) 

814 

815 # Authenticate with username 

816 with auth_api_session() as (auth_api, metadata_interceptor): 

817 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="password")) 

818 assert not reply.jailed 

819 

820 # Authenticate with email 

821 with auth_api_session() as (auth_api, metadata_interceptor): 

822 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.email, password="password")) 

823 assert not reply.jailed 

824 

825 

826def test_unsuccessful_authenticate(db): 

827 user, _ = generate_user(hashed_password=hash_password("password")) 

828 

829 # Invalid password 

830 with auth_api_session() as (auth_api, metadata_interceptor): 

831 with pytest.raises(grpc.RpcError) as e: 

832 reply = auth_api.Authenticate(auth_pb2.AuthReq(user=user.username, password="incorrectpassword")) 

833 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

834 assert e.value.details() == "Wrong password." 

835 

836 # Invalid username 

837 with auth_api_session() as (auth_api, metadata_interceptor): 

838 with pytest.raises(grpc.RpcError) as e: 

839 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="notarealusername", password="password")) 

840 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

841 assert e.value.details() == "An account with that username or email was not found." 

842 

843 # Invalid email 

844 with auth_api_session() as (auth_api, metadata_interceptor): 

845 with pytest.raises(grpc.RpcError) as e: 

846 reply = auth_api.Authenticate( 

847 auth_pb2.AuthReq(user=f"{random_hex(12)}@couchers.org.invalid", password="password") 

848 ) 

849 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

850 assert e.value.details() == "An account with that username or email was not found." 

851 

852 # Invalid id 

853 with auth_api_session() as (auth_api, metadata_interceptor): 

854 with pytest.raises(grpc.RpcError) as e: 

855 reply = auth_api.Authenticate(auth_pb2.AuthReq(user="-1", password="password")) 

856 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

857 assert e.value.details() == "An account with that username or email was not found." 

858 

859 

860def test_complete_signup(db): 

861 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

862 with auth_api_session() as (auth_api, metadata_interceptor): 

863 reply = auth_api.SignupFlow( 

864 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

865 ) 

866 

867 flow_token = reply.flow_token 

868 

869 with auth_api_session() as (auth_api, metadata_interceptor): 

870 # Invalid username 

871 with pytest.raises(grpc.RpcError) as e: 

872 auth_api.SignupFlow( 

873 auth_pb2.SignupFlowReq( 

874 flow_token=flow_token, 

875 account=auth_pb2.SignupAccount( 

876 username=" ", 

877 password="a very insecure password", 

878 city="Minas Tirith", 

879 birthdate="1980-12-31", 

880 gender="Robot", 

881 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

882 lat=1, 

883 lng=1, 

884 radius=100, 

885 accept_tos=True, 

886 ), 

887 ) 

888 ) 

889 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

890 assert e.value.details() == "Invalid username." 

891 

892 with auth_api_session() as (auth_api, metadata_interceptor): 

893 # Invalid name 

894 with pytest.raises(grpc.RpcError) as e: 

895 auth_api.SignupFlow( 

896 auth_pb2.SignupFlowReq( 

897 basic=auth_pb2.SignupBasic(name=" ", email=f"{random_hex(12)}@couchers.org.invalid") 

898 ) 

899 ) 

900 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

901 assert e.value.details() == "Name not supported." 

902 

903 with auth_api_session() as (auth_api, metadata_interceptor): 

904 # Hosting status required 

905 with pytest.raises(grpc.RpcError) as e: 

906 auth_api.SignupFlow( 

907 auth_pb2.SignupFlowReq( 

908 flow_token=flow_token, 

909 account=auth_pb2.SignupAccount( 

910 username="frodo", 

911 password="a very insecure password", 

912 city="Minas Tirith", 

913 birthdate="1980-12-31", 

914 gender="Robot", 

915 hosting_status=None, 

916 lat=1, 

917 lng=1, 

918 radius=100, 

919 accept_tos=True, 

920 ), 

921 ) 

922 ) 

923 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

924 assert e.value.details() == "Hosting status is required." 

925 

926 user, _ = generate_user() 

927 with auth_api_session() as (auth_api, metadata_interceptor): 

928 # Username unavailable 

929 with pytest.raises(grpc.RpcError) as e: 

930 auth_api.SignupFlow( 

931 auth_pb2.SignupFlowReq( 

932 flow_token=flow_token, 

933 account=auth_pb2.SignupAccount( 

934 username=user.username, 

935 password="a very insecure password", 

936 city="Minas Tirith", 

937 birthdate="1980-12-31", 

938 gender="Robot", 

939 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

940 lat=1, 

941 lng=1, 

942 radius=100, 

943 accept_tos=True, 

944 ), 

945 ) 

946 ) 

947 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

948 assert e.value.details() == "Sorry, that username isn't available." 

949 

950 with auth_api_session() as (auth_api, metadata_interceptor): 

951 # Invalid coordinate 

952 with pytest.raises(grpc.RpcError) as e: 

953 auth_api.SignupFlow( 

954 auth_pb2.SignupFlowReq( 

955 flow_token=flow_token, 

956 account=auth_pb2.SignupAccount( 

957 username="frodo", 

958 password="a very insecure password", 

959 city="Minas Tirith", 

960 birthdate="1980-12-31", 

961 gender="Robot", 

962 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

963 lat=0, 

964 lng=0, 

965 radius=100, 

966 accept_tos=True, 

967 ), 

968 ) 

969 ) 

970 assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT 

971 assert e.value.details() == "Invalid coordinate." 

972 

973 

974def test_signup_token_regression(db): 

975 # Repro steps: 

976 # 1. Start a signup 

977 # 2. Confirm the email 

978 # 3. Start a new signup with the same email 

979 # Expected: send a link to the email to continue signing up. 

980 # Actual: `AttributeError: 'SignupFlow' object has no attribute 'token'` 

981 

982 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

983 

984 # 1. Start a signup 

985 with auth_api_session() as (auth_api, metadata_interceptor): 

986 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

987 flow_token = res.flow_token 

988 assert flow_token 

989 

990 # 2. Confirm the email 

991 with session_scope() as session: 

992 email_token = session.execute( 

993 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

994 ).scalar_one() 

995 

996 with auth_api_session() as (auth_api, metadata_interceptor): 

997 auth_api.SignupFlow( 

998 auth_pb2.SignupFlowReq( 

999 flow_token=flow_token, 

1000 email_token=email_token, 

1001 ) 

1002 ) 

1003 

1004 # 3. Start a new signup with the same email 

1005 with auth_api_session() as (auth_api, metadata_interceptor): 

1006 with pytest.raises(grpc.RpcError) as e: 

1007 auth_api.SignupFlow(auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="frodo", email=testing_email))) 

1008 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1009 assert e.value.details() == "Please check your email for a link to continue signing up." 

1010 

1011 

1012@pytest.mark.parametrize("opt_out", [True, False]) 

1013def test_opt_out_of_newsletter(db, opt_out): 

1014 with auth_api_session() as (auth_api, metadata_interceptor): 

1015 res = auth_api.SignupFlow( 

1016 auth_pb2.SignupFlowReq( 

1017 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1018 account=auth_pb2.SignupAccount( 

1019 username="frodo", 

1020 password="a very insecure password", 

1021 birthdate="1970-01-01", 

1022 gender="Bot", 

1023 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1024 city="New York City", 

1025 lat=40.7331, 

1026 lng=-73.9778, 

1027 radius=500, 

1028 accept_tos=True, 

1029 opt_out_of_newsletter=opt_out, 

1030 ), 

1031 feedback=auth_pb2.ContributorForm(), 

1032 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1033 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1034 ) 

1035 ) 

1036 

1037 with session_scope() as session: 

1038 email_token = session.execute( 

1039 select(SignupFlow.email_token).where(SignupFlow.flow_token == res.flow_token) 

1040 ).scalar_one() 

1041 

1042 with auth_api_session() as (auth_api, metadata_interceptor): 

1043 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1044 

1045 user_id = res.auth_res.user_id 

1046 

1047 with session_scope() as session: 

1048 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1049 assert not user.in_sync_with_newsletter 

1050 assert user.opt_out_of_newsletter == opt_out 

1051 

1052 

1053def test_GetAuthState(db): 

1054 user, token = generate_user() 

1055 jailed_user, jailed_token = generate_user(accepted_tos=0) 

1056 

1057 with auth_api_session() as (auth_api, metadata_interceptor): 

1058 res = auth_api.GetAuthState(empty_pb2.Empty()) 

1059 assert not res.logged_in 

1060 assert not res.HasField("auth_res") 

1061 

1062 with auth_api_session() as (auth_api, metadata_interceptor): 

1063 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1064 assert res.logged_in 

1065 assert res.HasField("auth_res") 

1066 assert res.auth_res.user_id == user.id 

1067 assert not res.auth_res.jailed 

1068 

1069 auth_api.Deauthenticate(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1070 

1071 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={token}"),)) 

1072 assert not res.logged_in 

1073 assert not res.HasField("auth_res") 

1074 

1075 with auth_api_session() as (auth_api, metadata_interceptor): 

1076 res = auth_api.GetAuthState(empty_pb2.Empty(), metadata=(("cookie", f"couchers-sesh={jailed_token}"),)) 

1077 assert res.logged_in 

1078 assert res.HasField("auth_res") 

1079 assert res.auth_res.user_id == jailed_user.id 

1080 assert res.auth_res.jailed 

1081 

1082 

1083def test_signup_no_feedback_regression(db): 

1084 """ 

1085 When we first remove the feedback form, the backned was saying it's not needed but was not completing the signup, 

1086 this regression test checks that. 

1087 """ 

1088 with auth_api_session() as (auth_api, metadata_interceptor): 

1089 res = auth_api.SignupFlow( 

1090 auth_pb2.SignupFlowReq( 

1091 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1092 account=auth_pb2.SignupAccount( 

1093 username="frodo", 

1094 password="a very insecure password", 

1095 birthdate="1970-01-01", 

1096 gender="Bot", 

1097 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1098 city="New York City", 

1099 lat=40.7331, 

1100 lng=-73.9778, 

1101 radius=500, 

1102 accept_tos=True, 

1103 ), 

1104 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1105 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1106 ) 

1107 ) 

1108 

1109 flow_token = res.flow_token 

1110 

1111 assert res.flow_token 

1112 assert not res.HasField("auth_res") 

1113 assert not res.need_basic 

1114 assert not res.need_account 

1115 assert not res.need_feedback 

1116 assert not res.need_motivations 

1117 assert res.need_verify_email 

1118 

1119 # read out the signup token directly from the database for now 

1120 with session_scope() as session: 

1121 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1122 assert flow.email_sent 

1123 assert not flow.email_verified 

1124 email_token = flow.email_token 

1125 

1126 with auth_api_session() as (auth_api, metadata_interceptor): 

1127 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1128 

1129 assert not res.flow_token 

1130 assert res.HasField("auth_res") 

1131 assert res.auth_res.user_id 

1132 assert not res.auth_res.jailed 

1133 assert not res.need_basic 

1134 assert not res.need_account 

1135 assert not res.need_feedback 

1136 assert not res.need_motivations 

1137 assert not res.need_verify_email 

1138 

1139 # make sure we got the right token in a cookie 

1140 with session_scope() as session: 

1141 token = session.execute( 

1142 select(UserSession.token).join(User, UserSession.user_id == User.id).where(User.username == "frodo") 

1143 ).scalar_one() 

1144 sesh, uid = get_session_cookie_tokens(metadata_interceptor) 

1145 assert sesh == token 

1146 

1147 

1148def test_banned_username(db): 

1149 testing_email = f"{random_hex(12)}@couchers.org.invalid" 

1150 with auth_api_session() as (auth_api, metadata_interceptor): 

1151 reply = auth_api.SignupFlow( 

1152 auth_pb2.SignupFlowReq(basic=auth_pb2.SignupBasic(name="Tester", email=testing_email)) 

1153 ) 

1154 

1155 flow_token = reply.flow_token 

1156 

1157 with auth_api_session() as (auth_api, metadata_interceptor): 

1158 # Banned username 

1159 with pytest.raises(grpc.RpcError) as e: 

1160 auth_api.SignupFlow( 

1161 auth_pb2.SignupFlowReq( 

1162 flow_token=flow_token, 

1163 account=auth_pb2.SignupAccount( 

1164 username="thecouchersadminaccount", 

1165 password="a very insecure password", 

1166 city="Minas Tirith", 

1167 birthdate="1980-12-31", 

1168 gender="Robot", 

1169 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1170 lat=1, 

1171 lng=1, 

1172 radius=100, 

1173 accept_tos=True, 

1174 ), 

1175 ) 

1176 ) 

1177 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1178 assert e.value.details() == "Sorry, that username isn't available." 

1179 

1180 

1181# tests for ConfirmChangeEmail within test_account.py tests for test_ChangeEmail_* 

1182 

1183 

1184def test_GetInviteCodeInfo(db): 

1185 user, token = generate_user(complete_profile=True) 

1186 

1187 with account_session(token) as account: 

1188 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1189 

1190 with auth_api_session() as (auth, _): 

1191 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1192 assert res.name == user.name 

1193 assert res.username == user.username 

1194 # Avatar URL should be a thumbnail URL with a hashed filename 

1195 assert "/img/thumbnail/" in res.avatar_url 

1196 assert res.avatar_url.endswith(".jpg") 

1197 # Verify the hashed filename looks correct (64 char hex hash) 

1198 assert len(res.avatar_url.split("/")[-1].replace(".jpg", "")) == 64 

1199 assert res.url == urls.invite_code_link(code=code) 

1200 

1201 

1202def test_GetInviteCodeInfo_no_avatar(db): 

1203 user, token = generate_user(complete_profile=False) 

1204 

1205 with account_session(token) as account: 

1206 code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1207 

1208 with auth_api_session() as (auth, _): 

1209 res = auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code=code)) 

1210 assert res.name == user.name 

1211 assert res.username == user.username 

1212 assert res.avatar_url == "" 

1213 assert res.url == urls.invite_code_link(code=code) 

1214 

1215 

1216def test_GetInviteCodeInfo_not_found(db): 

1217 generate_user() 

1218 

1219 with auth_api_session() as (auth, _): 

1220 with pytest.raises(grpc.RpcError) as e: 

1221 auth.GetInviteCodeInfo(auth_pb2.GetInviteCodeInfoReq(code="BADCODE")) 

1222 assert e.value.code() == grpc.StatusCode.NOT_FOUND 

1223 assert e.value.details() == "Invite code not found." 

1224 

1225 

1226def test_SignupFlow_invite_code(db): 

1227 user, token = generate_user() 

1228 

1229 with account_session(token) as account: 

1230 invite_code = account.CreateInviteCode(account_pb2.CreateInviteCodeReq()).code 

1231 

1232 with auth_api_session() as (auth_api, _): 

1233 # Signup basic step with invite code 

1234 res = auth_api.SignupFlow( 

1235 auth_pb2.SignupFlowReq( 

1236 basic=auth_pb2.SignupBasic( 

1237 name="Test User", 

1238 email="inviteuser@example.com", 

1239 invite_code=invite_code, 

1240 ) 

1241 ) 

1242 ) 

1243 flow_token = res.flow_token 

1244 assert flow_token 

1245 

1246 # Confirm email 

1247 with session_scope() as session: 

1248 email_token = session.execute( 

1249 select(SignupFlow.email_token).where(SignupFlow.flow_token == flow_token) 

1250 ).scalar_one() 

1251 

1252 auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1253 

1254 # Signup account step 

1255 auth_api.SignupFlow( 

1256 auth_pb2.SignupFlowReq( 

1257 flow_token=flow_token, 

1258 account=auth_pb2.SignupAccount( 

1259 username="invited_user", 

1260 password="secure password", 

1261 birthdate="1990-01-01", 

1262 gender="Other", 

1263 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1264 city="Example City", 

1265 lat=1, 

1266 lng=5, 

1267 radius=100, 

1268 accept_tos=True, 

1269 ), 

1270 feedback=auth_pb2.ContributorForm(), 

1271 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1272 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1273 ) 

1274 ) 

1275 

1276 # Check that invite_code_id is stored in the final User object 

1277 with session_scope() as session: 

1278 invite_code_id = session.execute( 

1279 select(User.invite_code_id).where(User.username == "invited_user") 

1280 ).scalar_one() 

1281 assert invite_code_id == invite_code 

1282 

1283 

1284def test_signup_with_motivations(db): 

1285 """ 

1286 Test signup flow with the new motivations step (heard_about_couchers and signup_motivations) 

1287 """ 

1288 with auth_api_session() as (auth_api, metadata_interceptor): 

1289 res = auth_api.SignupFlow( 

1290 auth_pb2.SignupFlowReq( 

1291 basic=auth_pb2.SignupBasic(name="testing", email="email@couchers.org.invalid"), 

1292 account=auth_pb2.SignupAccount( 

1293 username="intentuser", 

1294 password="a very insecure password", 

1295 birthdate="1970-01-01", 

1296 gender="Bot", 

1297 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1298 city="New York City", 

1299 lat=40.7331, 

1300 lng=-73.9778, 

1301 radius=500, 

1302 accept_tos=True, 

1303 ), 

1304 motivations=auth_pb2.SignupMotivations( 

1305 heard_about_couchers="friend", 

1306 motivations=["hosting", "surfing", "events"], 

1307 ), 

1308 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1309 ) 

1310 ) 

1311 

1312 flow_token = res.flow_token 

1313 assert flow_token 

1314 assert not res.HasField("auth_res") 

1315 assert res.need_verify_email 

1316 

1317 # Verify the motivations are stored in the SignupFlow 

1318 with session_scope() as session: 

1319 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1320 assert flow.heard_about_couchers == "friend" 

1321 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"} 

1322 email_token = flow.email_token 

1323 

1324 # Complete signup by verifying email 

1325 with auth_api_session() as (auth_api, metadata_interceptor): 

1326 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1327 

1328 assert res.HasField("auth_res") 

1329 user_id = res.auth_res.user_id 

1330 

1331 # Verify the motivations are transferred to the User object 

1332 with session_scope() as session: 

1333 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1334 assert user.heard_about_couchers == "friend" 

1335 assert user.signup_motivations is not None 

1336 assert set(user.signup_motivations) == {"hosting", "surfing", "events"} 

1337 

1338 

1339def test_signup_motivations_incremental(db): 

1340 """ 

1341 Test that motivations can be submitted incrementally as a separate step 

1342 """ 

1343 with auth_api_session() as (auth_api, metadata_interceptor): 

1344 # First, basic signup 

1345 res = auth_api.SignupFlow( 

1346 auth_pb2.SignupFlowReq( 

1347 basic=auth_pb2.SignupBasic(name="testing", email="email2@couchers.org.invalid"), 

1348 ) 

1349 ) 

1350 

1351 flow_token = res.flow_token 

1352 assert flow_token 

1353 assert res.need_account 

1354 assert res.need_motivations # New field 

1355 

1356 # Submit motivations separately 

1357 with auth_api_session() as (auth_api, metadata_interceptor): 

1358 res = auth_api.SignupFlow( 

1359 auth_pb2.SignupFlowReq( 

1360 flow_token=flow_token, 

1361 motivations=auth_pb2.SignupMotivations( 

1362 heard_about_couchers="social_media", 

1363 motivations=["surfing"], 

1364 ), 

1365 ) 

1366 ) 

1367 

1368 assert res.flow_token == flow_token 

1369 assert not res.need_motivations # Should be filled now 

1370 assert res.need_account # Still need account 

1371 

1372 # Verify motivations are stored 

1373 with session_scope() as session: 

1374 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1375 assert flow.heard_about_couchers == "social_media" 

1376 assert flow.signup_motivations == ["surfing"] 

1377 

1378 

1379def test_signup_motivations_cannot_be_refilled(db): 

1380 """ 

1381 Test that motivations cannot be submitted twice 

1382 """ 

1383 with auth_api_session() as (auth_api, metadata_interceptor): 

1384 res = auth_api.SignupFlow( 

1385 auth_pb2.SignupFlowReq( 

1386 basic=auth_pb2.SignupBasic(name="testing", email="email3@couchers.org.invalid"), 

1387 motivations=auth_pb2.SignupMotivations( 

1388 heard_about_couchers="friend", 

1389 motivations=["hosting"], 

1390 ), 

1391 ) 

1392 ) 

1393 

1394 flow_token = res.flow_token 

1395 

1396 # Try to submit motivations again - should fail 

1397 with auth_api_session() as (auth_api, metadata_interceptor): 

1398 with pytest.raises(grpc.RpcError) as e: 

1399 auth_api.SignupFlow( 

1400 auth_pb2.SignupFlowReq( 

1401 flow_token=flow_token, 

1402 motivations=auth_pb2.SignupMotivations( 

1403 heard_about_couchers="different_source", 

1404 motivations=["surfing"], 

1405 ), 

1406 ) 

1407 ) 

1408 assert e.value.code() == grpc.StatusCode.FAILED_PRECONDITION 

1409 assert e.value.details() == "You've already told us about why you are signing up." 

1410 

1411 

1412def test_signup_motivations_required(db): 

1413 """ 

1414 Test that signup cannot complete without providing motivations 

1415 """ 

1416 with auth_api_session() as (auth_api, metadata_interceptor): 

1417 res = auth_api.SignupFlow( 

1418 auth_pb2.SignupFlowReq( 

1419 basic=auth_pb2.SignupBasic(name="testing", email="email4@couchers.org.invalid"), 

1420 account=auth_pb2.SignupAccount( 

1421 username="nointents", 

1422 password="a very insecure password", 

1423 birthdate="1970-01-01", 

1424 gender="Bot", 

1425 hosting_status=api_pb2.HOSTING_STATUS_CAN_HOST, 

1426 city="New York City", 

1427 lat=40.7331, 

1428 lng=-73.9778, 

1429 radius=500, 

1430 accept_tos=True, 

1431 ), 

1432 # No motivations provided 

1433 accept_community_guidelines=wrappers_pb2.BoolValue(value=True), 

1434 ) 

1435 ) 

1436 

1437 flow_token = res.flow_token 

1438 assert not res.HasField("auth_res") 

1439 assert res.need_motivations # Intents still required 

1440 

1441 with session_scope() as session: 

1442 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1443 email_token = flow.email_token 

1444 

1445 # Verify email - signup still not complete without motivations 

1446 with auth_api_session() as (auth_api, metadata_interceptor): 

1447 res = auth_api.SignupFlow(auth_pb2.SignupFlowReq(email_token=email_token)) 

1448 

1449 assert not res.HasField("auth_res") 

1450 assert res.need_motivations 

1451 

1452 # Now submit motivations 

1453 with auth_api_session() as (auth_api, metadata_interceptor): 

1454 res = auth_api.SignupFlow( 

1455 auth_pb2.SignupFlowReq( 

1456 flow_token=flow_token, 

1457 motivations=auth_pb2.SignupMotivations(motivations=["surfing"]), 

1458 ) 

1459 ) 

1460 

1461 assert res.HasField("auth_res") 

1462 user_id = res.auth_res.user_id 

1463 

1464 with session_scope() as session: 

1465 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

1466 assert user.signup_motivations == ["surfing"] 

1467 

1468 

1469def test_signup_motivations_all_options(db): 

1470 """ 

1471 Test all the different motivation options 

1472 """ 

1473 with auth_api_session() as (auth_api, metadata_interceptor): 

1474 res = auth_api.SignupFlow( 

1475 auth_pb2.SignupFlowReq( 

1476 basic=auth_pb2.SignupBasic(name="testing", email="email5@couchers.org.invalid"), 

1477 motivations=auth_pb2.SignupMotivations( 

1478 heard_about_couchers="other", 

1479 motivations=["hosting", "surfing", "events"], 

1480 ), 

1481 ) 

1482 ) 

1483 

1484 flow_token = res.flow_token 

1485 

1486 with session_scope() as session: 

1487 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1488 assert flow.heard_about_couchers == "other" 

1489 assert set(flow.signup_motivations) == {"hosting", "surfing", "events"} 

1490 

1491 

1492def test_signup_motivations_empty_motivations_list(db): 

1493 """ 

1494 Test that providing heard_about but empty motivations list is valid 

1495 """ 

1496 with auth_api_session() as (auth_api, metadata_interceptor): 

1497 res = auth_api.SignupFlow( 

1498 auth_pb2.SignupFlowReq( 

1499 basic=auth_pb2.SignupBasic(name="testing", email="email6@couchers.org.invalid"), 

1500 motivations=auth_pb2.SignupMotivations( 

1501 heard_about_couchers="former_cs_member", 

1502 motivations=[], # No specific motivations selected 

1503 ), 

1504 ) 

1505 ) 

1506 

1507 flow_token = res.flow_token 

1508 

1509 with session_scope() as session: 

1510 flow = session.execute(select(SignupFlow).where(SignupFlow.flow_token == flow_token)).scalar_one() 

1511 assert flow.heard_about_couchers == "former_cs_member" 

1512 assert flow.signup_motivations == []