Coverage for src/tests/test_fixtures.py: 98%

522 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 15:07 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date, timedelta 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.orm import close_all_sessions 

12from sqlalchemy.sql import or_, text 

13 

14from couchers.config import config 

15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

16from couchers.crypto import random_hex 

17from couchers.db import _get_base_engine, session_scope 

18from couchers.descriptor_pool import get_descriptor_pool 

19from couchers.interceptors import ( 

20 AuthValidatorInterceptor, 

21 CookieInterceptor, 

22 SessionInterceptor, 

23 _try_get_and_update_user_details, 

24) 

25from couchers.jobs.worker import process_job 

26from couchers.models import ( 

27 Base, 

28 FriendRelationship, 

29 FriendStatus, 

30 HostingStatus, 

31 Language, 

32 LanguageAbility, 

33 LanguageFluency, 

34 MeetupStatus, 

35 PassportSex, 

36 Region, 

37 RegionLived, 

38 RegionVisited, 

39 StrongVerificationAttempt, 

40 StrongVerificationAttemptStatus, 

41 Upload, 

42 User, 

43 UserBlock, 

44 UserSession, 

45) 

46from couchers.servicers.account import Account, Iris 

47from couchers.servicers.admin import Admin 

48from couchers.servicers.api import API 

49from couchers.servicers.auth import Auth, create_session 

50from couchers.servicers.blocking import Blocking 

51from couchers.servicers.bugs import Bugs 

52from couchers.servicers.communities import Communities 

53from couchers.servicers.conversations import Conversations 

54from couchers.servicers.discussions import Discussions 

55from couchers.servicers.donations import Donations, Stripe 

56from couchers.servicers.events import Events 

57from couchers.servicers.gis import GIS 

58from couchers.servicers.groups import Groups 

59from couchers.servicers.jail import Jail 

60from couchers.servicers.media import Media, get_media_auth_interceptor 

61from couchers.servicers.notifications import Notifications 

62from couchers.servicers.pages import Pages 

63from couchers.servicers.public import Public 

64from couchers.servicers.references import References 

65from couchers.servicers.reporting import Reporting 

66from couchers.servicers.requests import Requests 

67from couchers.servicers.resources import Resources 

68from couchers.servicers.search import Search 

69from couchers.servicers.threads import Threads 

70from couchers.sql import couchers_select as select 

71from couchers.utils import create_coordinate, now 

72from proto import ( 

73 account_pb2_grpc, 

74 admin_pb2_grpc, 

75 annotations_pb2, 

76 api_pb2_grpc, 

77 auth_pb2_grpc, 

78 blocking_pb2_grpc, 

79 bugs_pb2_grpc, 

80 communities_pb2_grpc, 

81 conversations_pb2_grpc, 

82 discussions_pb2_grpc, 

83 donations_pb2_grpc, 

84 events_pb2_grpc, 

85 gis_pb2_grpc, 

86 groups_pb2_grpc, 

87 iris_pb2_grpc, 

88 jail_pb2_grpc, 

89 media_pb2_grpc, 

90 notifications_pb2_grpc, 

91 pages_pb2_grpc, 

92 public_pb2_grpc, 

93 references_pb2_grpc, 

94 reporting_pb2_grpc, 

95 requests_pb2_grpc, 

96 resources_pb2_grpc, 

97 search_pb2_grpc, 

98 stripe_pb2_grpc, 

99 threads_pb2_grpc, 

100) 

101 

102 

103def drop_all(): 

104 """drop everything currently in the database""" 

105 with session_scope() as session: 

106 # postgis is required for all the Geographic Information System (GIS) stuff 

107 # pg_trgm is required for trigram based search 

108 # btree_gist is required for gist-based exclusion constraints 

109 session.execute( 

110 text( 

111 "DROP SCHEMA IF EXISTS public CASCADE;" 

112 "DROP SCHEMA IF EXISTS logging CASCADE;" 

113 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

114 "CREATE SCHEMA public;" 

115 "CREATE SCHEMA logging;" 

116 "CREATE EXTENSION postgis;" 

117 "CREATE EXTENSION pg_trgm;" 

118 "CREATE EXTENSION btree_gist;" 

119 ) 

120 ) 

121 

122 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise 

123 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585" 

124 # and similar errors 

125 _get_base_engine().dispose() 

126 

127 close_all_sessions() 

128 

129 

130def create_schema_from_models(): 

131 """ 

132 Create everything from the current models, not incrementally 

133 through migrations. 

134 """ 

135 

136 # create the slugify function 

137 functions = Path(__file__).parent / "slugify.sql" 

138 with open(functions) as f, session_scope() as session: 

139 session.execute(text(f.read())) 

140 

141 Base.metadata.create_all(_get_base_engine()) 

142 

143 

144def populate_testing_resources(session): 

145 """ 

146 Testing version of couchers.resources.copy_resources_to_database 

147 """ 

148 regions = [ 

149 ("AUS", "Australia"), 

150 ("CAN", "Canada"), 

151 ("CHE", "Switzerland"), 

152 ("CUB", "Cuba"), 

153 ("CXR", "Christmas Island"), 

154 ("CZE", "Czechia"), 

155 ("DEU", "Germany"), 

156 ("EGY", "Egypt"), 

157 ("ESP", "Spain"), 

158 ("EST", "Estonia"), 

159 ("FIN", "Finland"), 

160 ("FRA", "France"), 

161 ("GBR", "United Kingdom"), 

162 ("GEO", "Georgia"), 

163 ("GHA", "Ghana"), 

164 ("GRC", "Greece"), 

165 ("HKG", "Hong Kong"), 

166 ("IRL", "Ireland"), 

167 ("ISR", "Israel"), 

168 ("ITA", "Italy"), 

169 ("JPN", "Japan"), 

170 ("LAO", "Laos"), 

171 ("MEX", "Mexico"), 

172 ("MMR", "Myanmar"), 

173 ("NAM", "Namibia"), 

174 ("NLD", "Netherlands"), 

175 ("NZL", "New Zealand"), 

176 ("POL", "Poland"), 

177 ("PRK", "North Korea"), 

178 ("REU", "Réunion"), 

179 ("SGP", "Singapore"), 

180 ("SWE", "Sweden"), 

181 ("THA", "Thailand"), 

182 ("TUR", "Turkey"), 

183 ("TWN", "Taiwan"), 

184 ("USA", "United States"), 

185 ("VNM", "Vietnam"), 

186 ] 

187 

188 languages = [ 

189 ("arb", "Arabic (Standard)"), 

190 ("deu", "German"), 

191 ("eng", "English"), 

192 ("fin", "Finnish"), 

193 ("fra", "French"), 

194 ("heb", "Hebrew"), 

195 ("hun", "Hungarian"), 

196 ("jpn", "Japanese"), 

197 ("pol", "Polish"), 

198 ("swe", "Swedish"), 

199 ("cmn", "Chinese (Mandarin)"), 

200 ] 

201 

202 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

203 tz_sql = f.read() 

204 

205 for code, name in regions: 

206 session.add(Region(code=code, name=name)) 

207 

208 for code, name in languages: 

209 session.add(Language(code=code, name=name)) 

210 

211 session.execute(text(tz_sql)) 

212 

213 

214def recreate_database(): 

215 """ 

216 Connect to a running Postgres database, build it using metadata.create_all() 

217 """ 

218 

219 # running in non-UTC catches some timezone errors 

220 os.environ["TZ"] = "America/New_York" 

221 

222 # drop everything currently in the database 

223 drop_all() 

224 

225 # create everything from the current models, not incrementally through migrations 

226 create_schema_from_models() 

227 

228 with session_scope() as session: 

229 populate_testing_resources(session) 

230 

231 

232@pytest.fixture() 

233def db(): 

234 """ 

235 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

236 """ 

237 

238 recreate_database() 

239 

240 

241def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs): 

242 """ 

243 Create a new user, return session token 

244 

245 The user is detached from any session, and you can access its static attributes, but you can't modify it 

246 

247 Use this most of the time 

248 """ 

249 auth = Auth() 

250 

251 with session_scope() as session: 

252 # default args 

253 username = "test_user_" + random_hex(16) 

254 user_opts = { 

255 "username": username, 

256 "email": f"{username}@dev.couchers.org", 

257 # password is just 'password' 

258 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

259 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

260 "name": username.capitalize(), 

261 "hosting_status": HostingStatus.cant_host, 

262 "meetup_status": MeetupStatus.open_to_meetup, 

263 "city": "Testing city", 

264 "hometown": "Test hometown", 

265 "community_standing": 0.5, 

266 "birthdate": date(year=2000, month=1, day=1), 

267 "gender": "Woman", 

268 "pronouns": "", 

269 "occupation": "Tester", 

270 "education": "UST(esting)", 

271 "about_me": "I test things", 

272 "things_i_like": "Code", 

273 "about_place": "My place has a lot of testing paraphenelia", 

274 "additional_information": "I can be a bit testy", 

275 # you need to make sure to update this logic to make sure the user is jailed/not on request 

276 "accepted_tos": TOS_VERSION, 

277 "accepted_community_guidelines": GUIDELINES_VERSION, 

278 "geom": create_coordinate(40.7108, -73.9740), 

279 "geom_radius": 100, 

280 "onboarding_emails_sent": 1, 

281 "last_onboarding_email_sent": now(), 

282 "has_donated": True, 

283 } 

284 

285 for key, value in kwargs.items(): 

286 user_opts[key] = value 

287 

288 user = User(**user_opts) 

289 session.add(user) 

290 session.flush() 

291 

292 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

293 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

294 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

295 

296 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

297 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

298 session.add(RegionLived(user_id=user.id, region_code="EST")) 

299 

300 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

301 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

302 

303 # this expires the user, so now it's "dirty" 

304 session.commit() 

305 

306 class _DummyContext: 

307 def invocation_metadata(self): 

308 return {} 

309 

310 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False) 

311 

312 # deleted user aborts session creation, hence this follows and necessitates a second commit 

313 if delete_user: 

314 user.is_deleted = True 

315 

316 user.recommendation_score = 1e10 - user.id 

317 

318 if complete_profile: 

319 key = random_hex(32) 

320 filename = random_hex(32) + ".jpg" 

321 session.add( 

322 Upload( 

323 key=key, 

324 filename=filename, 

325 creator_user_id=user.id, 

326 ) 

327 ) 

328 session.flush() 

329 user.avatar_key = key 

330 user.about_me = "I have a complete profile!\n" * 20 

331 

332 if strong_verification: 

333 attempt = StrongVerificationAttempt( 

334 verification_attempt_token=f"verification_attempt_token_{user.id}", 

335 user_id=user.id, 

336 status=StrongVerificationAttemptStatus.succeeded, 

337 has_full_data=True, 

338 passport_encrypted_data=b"not real", 

339 passport_date_of_birth=user.birthdate, 

340 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

341 user.gender, PassportSex.unspecified 

342 ), 

343 has_minimal_data=True, 

344 passport_expiry_date=date.today() + timedelta(days=10), 

345 passport_nationality="UTO", 

346 passport_last_three_document_chars=f"{user.id:03}", 

347 iris_token=f"iris_token_{user.id}", 

348 iris_session_id=user.id, 

349 ) 

350 session.add(attempt) 

351 session.flush() 

352 assert attempt.has_strong_verification(user) 

353 

354 session.commit() 

355 

356 assert user.has_completed_profile == complete_profile 

357 

358 # refresh it, undoes the expiry 

359 session.refresh(user) 

360 

361 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

362 user.timezone # noqa: B018 

363 

364 # allows detaches the user from the session, allowing its use outside this session 

365 session.expunge(user) 

366 

367 return user, token 

368 

369 

370def get_user_id_and_token(session, username): 

371 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

372 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

373 return user_id, token 

374 

375 

376def make_friends(user1, user2): 

377 with session_scope() as session: 

378 friend_relationship = FriendRelationship( 

379 from_user_id=user1.id, 

380 to_user_id=user2.id, 

381 status=FriendStatus.accepted, 

382 ) 

383 session.add(friend_relationship) 

384 

385 

386def make_user_block(user1, user2): 

387 with session_scope() as session: 

388 user_block = UserBlock( 

389 blocking_user_id=user1.id, 

390 blocked_user_id=user2.id, 

391 ) 

392 session.add(user_block) 

393 session.commit() 

394 

395 

396def make_user_invisible(user_id): 

397 with session_scope() as session: 

398 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

399 

400 

401# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

402def get_friend_relationship(user1, user2): 

403 with session_scope() as session: 

404 friend_relationship = session.execute( 

405 select(FriendRelationship).where( 

406 or_( 

407 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

408 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

409 ) 

410 ) 

411 ).scalar_one_or_none() 

412 

413 session.expunge(friend_relationship) 

414 return friend_relationship 

415 

416 

417class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

418 """ 

419 Injects the right `cookie: couchers-sesh=...` header into the metadata 

420 """ 

421 

422 def __init__(self, token): 

423 self.token = token 

424 

425 def __call__(self, context, callback): 

426 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

427 

428 

429@contextmanager 

430def auth_api_session(grpc_channel_options=()): 

431 """ 

432 Create an Auth API for testing 

433 

434 This needs to use the real server since it plays around with headers 

435 """ 

436 with futures.ThreadPoolExecutor(1) as executor: 

437 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

438 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

439 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

440 server.start() 

441 

442 try: 

443 with grpc.secure_channel( 

444 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

445 ) as channel: 

446 

447 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

448 def __init__(self): 

449 self.latest_headers = {} 

450 

451 def intercept_unary_unary(self, continuation, client_call_details, request): 

452 call = continuation(client_call_details, request) 

453 self.latest_headers = dict(call.initial_metadata()) 

454 self.latest_header_raw = call.initial_metadata() 

455 return call 

456 

457 metadata_interceptor = _MetadataKeeperInterceptor() 

458 channel = grpc.intercept_channel(channel, metadata_interceptor) 

459 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

460 finally: 

461 server.stop(None).wait() 

462 

463 

464@contextmanager 

465def api_session(token): 

466 """ 

467 Create an API for testing, uses the token for auth 

468 """ 

469 channel = fake_channel(token) 

470 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

471 yield api_pb2_grpc.APIStub(channel) 

472 

473 

474@contextmanager 

475def real_api_session(token): 

476 """ 

477 Create an API for testing, using TCP sockets, uses the token for auth 

478 """ 

479 with futures.ThreadPoolExecutor(1) as executor: 

480 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

481 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

482 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

483 server.start() 

484 

485 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

486 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

487 

488 try: 

489 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

490 yield api_pb2_grpc.APIStub(channel) 

491 finally: 

492 server.stop(None).wait() 

493 

494 

495@contextmanager 

496def real_admin_session(token): 

497 """ 

498 Create a Admin service for testing, using TCP sockets, uses the token for auth 

499 """ 

500 with futures.ThreadPoolExecutor(1) as executor: 

501 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

502 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

503 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

504 server.start() 

505 

506 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

507 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

508 

509 try: 

510 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

511 yield admin_pb2_grpc.AdminStub(channel) 

512 finally: 

513 server.stop(None).wait() 

514 

515 

516@contextmanager 

517def real_account_session(token): 

518 """ 

519 Create a Account service for testing, using TCP sockets, uses the token for auth 

520 """ 

521 with futures.ThreadPoolExecutor(1) as executor: 

522 server = grpc.server( 

523 executor, interceptors=[AuthValidatorInterceptor(), CookieInterceptor(), SessionInterceptor()] 

524 ) 

525 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

526 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

527 server.start() 

528 

529 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

530 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

531 

532 try: 

533 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

534 yield account_pb2_grpc.AccountStub(channel) 

535 finally: 

536 server.stop(None).wait() 

537 

538 

539@contextmanager 

540def real_jail_session(token): 

541 """ 

542 Create a Jail service for testing, using TCP sockets, uses the token for auth 

543 """ 

544 with futures.ThreadPoolExecutor(1) as executor: 

545 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

546 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

547 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

548 server.start() 

549 

550 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

551 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

552 

553 try: 

554 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

555 yield jail_pb2_grpc.JailStub(channel) 

556 finally: 

557 server.stop(None).wait() 

558 

559 

560@contextmanager 

561def gis_session(token): 

562 channel = fake_channel(token) 

563 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

564 yield gis_pb2_grpc.GISStub(channel) 

565 

566 

567@contextmanager 

568def public_session(): 

569 channel = fake_channel() 

570 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

571 yield public_pb2_grpc.PublicStub(channel) 

572 

573 

574class FakeRpcError(grpc.RpcError): 

575 def __init__(self, code, details): 

576 self._code = code 

577 self._details = details 

578 

579 def code(self): 

580 return self._code 

581 

582 def details(self): 

583 return self._details 

584 

585 

586def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry): 

587 # method is of the form "/org.couchers.api.core.API/GetUser" 

588 _, service_name, method_name = method.split("/") 

589 

590 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

591 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

592 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

593 assert auth_level in [ 

594 annotations_pb2.AUTH_LEVEL_OPEN, 

595 annotations_pb2.AUTH_LEVEL_JAILED, 

596 annotations_pb2.AUTH_LEVEL_SECURE, 

597 annotations_pb2.AUTH_LEVEL_ADMIN, 

598 ] 

599 

600 if not user_id: 

601 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

602 else: 

603 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

604 "Non-superuser tried to call superuser API" 

605 ) 

606 assert not ( 

607 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

608 ), "User is jailed but tried to call non-open/non-jailed API" 

609 

610 

611class FakeChannel: 

612 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None): 

613 self.handlers = {} 

614 self.user_id = user_id 

615 self._is_jailed = is_jailed 

616 self._is_superuser = is_superuser 

617 self._token_expiry = token_expiry 

618 

619 def abort(self, code, details): 

620 raise FakeRpcError(code, details) 

621 

622 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

623 from grpc._server import _validate_generic_rpc_handlers 

624 

625 _validate_generic_rpc_handlers(generic_rpc_handlers) 

626 

627 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

628 

629 def unary_unary(self, uri, request_serializer, response_deserializer): 

630 handler = self.handlers[uri] 

631 

632 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry) 

633 

634 def fake_handler(request): 

635 # Do a full serialization cycle on the request and the 

636 # response to catch accidental use of unserializable data. 

637 request = handler.request_deserializer(request_serializer(request)) 

638 

639 with session_scope() as session: 

640 response = handler.unary_unary(request, self, session) 

641 

642 return response_deserializer(handler.response_serializer(response)) 

643 

644 return fake_handler 

645 

646 

647def fake_channel(token=None): 

648 if token: 

649 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details( 

650 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

651 ) 

652 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry) 

653 return FakeChannel() 

654 

655 

656@contextmanager 

657def conversations_session(token): 

658 """ 

659 Create a Conversations API for testing, uses the token for auth 

660 """ 

661 channel = fake_channel(token) 

662 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

663 yield conversations_pb2_grpc.ConversationsStub(channel) 

664 

665 

666@contextmanager 

667def requests_session(token): 

668 """ 

669 Create a Requests API for testing, uses the token for auth 

670 """ 

671 channel = fake_channel(token) 

672 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

673 yield requests_pb2_grpc.RequestsStub(channel) 

674 

675 

676@contextmanager 

677def threads_session(token): 

678 channel = fake_channel(token) 

679 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

680 yield threads_pb2_grpc.ThreadsStub(channel) 

681 

682 

683@contextmanager 

684def discussions_session(token): 

685 channel = fake_channel(token) 

686 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

687 yield discussions_pb2_grpc.DiscussionsStub(channel) 

688 

689 

690@contextmanager 

691def donations_session(token): 

692 channel = fake_channel(token) 

693 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

694 yield donations_pb2_grpc.DonationsStub(channel) 

695 

696 

697@contextmanager 

698def real_stripe_session(): 

699 """ 

700 Create a Stripe service for testing, using TCP sockets 

701 """ 

702 with futures.ThreadPoolExecutor(1) as executor: 

703 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

704 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

705 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

706 server.start() 

707 

708 creds = grpc.local_channel_credentials() 

709 

710 try: 

711 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

712 yield stripe_pb2_grpc.StripeStub(channel) 

713 finally: 

714 server.stop(None).wait() 

715 

716 

717@contextmanager 

718def real_iris_session(): 

719 with futures.ThreadPoolExecutor(1) as executor: 

720 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

721 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

722 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

723 server.start() 

724 

725 creds = grpc.local_channel_credentials() 

726 

727 try: 

728 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

729 yield iris_pb2_grpc.IrisStub(channel) 

730 finally: 

731 server.stop(None).wait() 

732 

733 

734@contextmanager 

735def pages_session(token): 

736 channel = fake_channel(token) 

737 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

738 yield pages_pb2_grpc.PagesStub(channel) 

739 

740 

741@contextmanager 

742def communities_session(token): 

743 channel = fake_channel(token) 

744 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

745 yield communities_pb2_grpc.CommunitiesStub(channel) 

746 

747 

748@contextmanager 

749def groups_session(token): 

750 channel = fake_channel(token) 

751 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

752 yield groups_pb2_grpc.GroupsStub(channel) 

753 

754 

755@contextmanager 

756def blocking_session(token): 

757 channel = fake_channel(token) 

758 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

759 yield blocking_pb2_grpc.BlockingStub(channel) 

760 

761 

762@contextmanager 

763def notifications_session(token): 

764 channel = fake_channel(token) 

765 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

766 yield notifications_pb2_grpc.NotificationsStub(channel) 

767 

768 

769@contextmanager 

770def account_session(token): 

771 """ 

772 Create a Account API for testing, uses the token for auth 

773 """ 

774 channel = fake_channel(token) 

775 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

776 yield account_pb2_grpc.AccountStub(channel) 

777 

778 

779@contextmanager 

780def search_session(token): 

781 """ 

782 Create a Search API for testing, uses the token for auth 

783 """ 

784 channel = fake_channel(token) 

785 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

786 yield search_pb2_grpc.SearchStub(channel) 

787 

788 

789@contextmanager 

790def references_session(token): 

791 """ 

792 Create a References API for testing, uses the token for auth 

793 """ 

794 channel = fake_channel(token) 

795 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

796 yield references_pb2_grpc.ReferencesStub(channel) 

797 

798 

799@contextmanager 

800def reporting_session(token): 

801 channel = fake_channel(token) 

802 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

803 yield reporting_pb2_grpc.ReportingStub(channel) 

804 

805 

806@contextmanager 

807def events_session(token): 

808 channel = fake_channel(token) 

809 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

810 yield events_pb2_grpc.EventsStub(channel) 

811 

812 

813@contextmanager 

814def bugs_session(token=None): 

815 channel = fake_channel(token) 

816 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

817 yield bugs_pb2_grpc.BugsStub(channel) 

818 

819 

820@contextmanager 

821def resources_session(): 

822 channel = fake_channel() 

823 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

824 yield resources_pb2_grpc.ResourcesStub(channel) 

825 

826 

827@contextmanager 

828def media_session(bearer_token): 

829 """ 

830 Create a fresh Media API for testing, uses the bearer token for media auth 

831 """ 

832 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

833 

834 with futures.ThreadPoolExecutor(1) as executor: 

835 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()]) 

836 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

837 servicer = Media() 

838 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

839 server.start() 

840 

841 call_creds = grpc.access_token_call_credentials(bearer_token) 

842 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

843 

844 try: 

845 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

846 yield media_pb2_grpc.MediaStub(channel) 

847 finally: 

848 server.stop(None).wait() 

849 

850 

851@pytest.fixture(scope="class") 

852def testconfig(): 

853 prevconfig = config.copy() 

854 config.clear() 

855 config.update(prevconfig) 

856 

857 config["IN_TEST"] = True 

858 

859 config["DEV"] = True 

860 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

861 config["VERSION"] = "testing_version" 

862 config["BASE_URL"] = "http://localhost:3000" 

863 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

864 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

865 config["COOKIE_DOMAIN"] = "localhost" 

866 

867 config["ENABLE_SMS"] = False 

868 config["SMS_SENDER_ID"] = "invalid" 

869 

870 config["ENABLE_EMAIL"] = False 

871 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

872 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

873 config["NOTIFICATION_PREFIX"] = "[TEST] " 

874 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

875 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

876 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

877 

878 config["ENABLE_DONATIONS"] = False 

879 config["STRIPE_API_KEY"] = "" 

880 config["STRIPE_WEBHOOK_SECRET"] = "" 

881 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

882 

883 config["ENABLE_STRONG_VERIFICATION"] = False 

884 config["IRIS_ID_PUBKEY"] = "" 

885 config["IRIS_ID_SECRET"] = "" 

886 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

887 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

888 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

889 ) 

890 

891 config["SMTP_HOST"] = "localhost" 

892 config["SMTP_PORT"] = 587 

893 config["SMTP_USERNAME"] = "username" 

894 config["SMTP_PASSWORD"] = "password" 

895 

896 config["ENABLE_MEDIA"] = True 

897 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

898 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

899 ) 

900 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

901 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

902 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

903 

904 config["BUG_TOOL_ENABLED"] = False 

905 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

906 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

907 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

908 

909 config["LISTMONK_ENABLED"] = False 

910 config["LISTMONK_BASE_URL"] = "https://localhost" 

911 config["LISTMONK_API_USERNAME"] = "..." 

912 config["LISTMONK_API_KEY"] = "..." 

913 config["LISTMONK_LIST_ID"] = 3 

914 

915 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

916 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

917 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

918 

919 config["ACTIVENESS_PROBES_ENABLED"] = True 

920 

921 yield None 

922 

923 config.clear() 

924 config.update(prevconfig) 

925 

926 

927def run_migration_test(): 

928 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true" 

929 

930 

931@pytest.fixture 

932def fast_passwords(): 

933 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

934 # make this fast by removing the hashing step 

935 

936 def fast_hash(password: bytes) -> bytes: 

937 return b"fake hash:" + password 

938 

939 def fast_verify(hashed: bytes, password: bytes) -> bool: 

940 return hashed == fast_hash(password) 

941 

942 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

943 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

944 yield 

945 

946 

947def process_jobs(): 

948 while process_job(): 

949 pass 

950 

951 

952@contextmanager 

953def mock_notification_email(): 

954 with patch("couchers.email._queue_email") as mock: 

955 yield mock 

956 process_jobs() 

957 

958 

959@dataclass 

960class EmailData: 

961 sender_name: str 

962 sender_email: str 

963 recipient: str 

964 subject: str 

965 plain: str 

966 html: str 

967 source_data: str 

968 list_unsubscribe_header: str 

969 

970 

971def email_fields(mock, call_ix=0): 

972 _, kw = mock.call_args_list[call_ix] 

973 return EmailData( 

974 sender_name=kw.get("sender_name"), 

975 sender_email=kw.get("sender_email"), 

976 recipient=kw.get("recipient"), 

977 subject=kw.get("subject"), 

978 plain=kw.get("plain"), 

979 html=kw.get("html"), 

980 source_data=kw.get("source_data"), 

981 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

982 ) 

983 

984 

985@pytest.fixture 

986def push_collector(): 

987 """ 

988 See test_SendTestPushNotification for an example on how to use this fixture 

989 """ 

990 

991 class Push: 

992 """ 

993 This allows nice access to the push info via e.g. push.title instead of push["title"] 

994 """ 

995 

996 def __init__(self, kwargs): 

997 self.kwargs = kwargs 

998 

999 def __getattr__(self, attr): 

1000 try: 

1001 return self.kwargs[attr] 

1002 except KeyError: 

1003 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

1004 

1005 def __repr__(self): 

1006 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

1007 return f"Push({kwargs_disp})" 

1008 

1009 class PushCollector: 

1010 def __init__(self): 

1011 # pairs of (user_id, push) 

1012 self.pushes = [] 

1013 

1014 def by_user(self, user_id): 

1015 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

1016 

1017 def push_to_user(self, session, user_id, **kwargs): 

1018 self.pushes.append((user_id, Push(kwargs=kwargs))) 

1019 

1020 def assert_user_has_count(self, user_id, count): 

1021 assert len(self.by_user(user_id)) == count 

1022 

1023 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1024 push = self.by_user(user_id)[ix] 

1025 for kwarg in kwargs: 

1026 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1027 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1028 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1029 ) 

1030 

1031 def assert_user_has_single_matching(self, user_id, **kwargs): 

1032 self.assert_user_has_count(user_id, 1) 

1033 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1034 

1035 collector = PushCollector() 

1036 

1037 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1038 yield collector