Coverage for src/tests/test_fixtures.py: 98%

514 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-04-16 15:13 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date, timedelta 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.orm import close_all_sessions 

12from sqlalchemy.sql import or_, text 

13 

14from couchers.config import config 

15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

16from couchers.crypto import random_hex 

17from couchers.db import _get_base_engine, session_scope 

18from couchers.descriptor_pool import get_descriptor_pool 

19from couchers.interceptors import ( 

20 AuthValidatorInterceptor, 

21 CookieInterceptor, 

22 SessionInterceptor, 

23 _try_get_and_update_user_details, 

24) 

25from couchers.jobs.worker import process_job 

26from couchers.models import ( 

27 Base, 

28 FriendRelationship, 

29 FriendStatus, 

30 HostingStatus, 

31 Language, 

32 LanguageAbility, 

33 LanguageFluency, 

34 MeetupStatus, 

35 PassportSex, 

36 Region, 

37 RegionLived, 

38 RegionVisited, 

39 StrongVerificationAttempt, 

40 StrongVerificationAttemptStatus, 

41 Upload, 

42 User, 

43 UserBlock, 

44 UserSession, 

45) 

46from couchers.servicers.account import Account, Iris 

47from couchers.servicers.admin import Admin 

48from couchers.servicers.api import API 

49from couchers.servicers.auth import Auth, create_session 

50from couchers.servicers.blocking import Blocking 

51from couchers.servicers.bugs import Bugs 

52from couchers.servicers.communities import Communities 

53from couchers.servicers.conversations import Conversations 

54from couchers.servicers.discussions import Discussions 

55from couchers.servicers.donations import Donations, Stripe 

56from couchers.servicers.events import Events 

57from couchers.servicers.gis import GIS 

58from couchers.servicers.groups import Groups 

59from couchers.servicers.jail import Jail 

60from couchers.servicers.media import Media, get_media_auth_interceptor 

61from couchers.servicers.notifications import Notifications 

62from couchers.servicers.pages import Pages 

63from couchers.servicers.references import References 

64from couchers.servicers.reporting import Reporting 

65from couchers.servicers.requests import Requests 

66from couchers.servicers.resources import Resources 

67from couchers.servicers.search import Search 

68from couchers.servicers.threads import Threads 

69from couchers.sql import couchers_select as select 

70from couchers.utils import create_coordinate, now 

71from proto import ( 

72 account_pb2_grpc, 

73 admin_pb2_grpc, 

74 annotations_pb2, 

75 api_pb2_grpc, 

76 auth_pb2_grpc, 

77 blocking_pb2_grpc, 

78 bugs_pb2_grpc, 

79 communities_pb2_grpc, 

80 conversations_pb2_grpc, 

81 discussions_pb2_grpc, 

82 donations_pb2_grpc, 

83 events_pb2_grpc, 

84 gis_pb2_grpc, 

85 groups_pb2_grpc, 

86 iris_pb2_grpc, 

87 jail_pb2_grpc, 

88 media_pb2_grpc, 

89 notifications_pb2_grpc, 

90 pages_pb2_grpc, 

91 references_pb2_grpc, 

92 reporting_pb2_grpc, 

93 requests_pb2_grpc, 

94 resources_pb2_grpc, 

95 search_pb2_grpc, 

96 stripe_pb2_grpc, 

97 threads_pb2_grpc, 

98) 

99 

100 

101def drop_all(): 

102 """drop everything currently in the database""" 

103 with session_scope() as session: 

104 # postgis is required for all the Geographic Information System (GIS) stuff 

105 # pg_trgm is required for trigram based search 

106 # btree_gist is required for gist-based exclusion constraints 

107 session.execute( 

108 text( 

109 "DROP SCHEMA IF EXISTS public CASCADE;" 

110 "DROP SCHEMA IF EXISTS logging CASCADE;" 

111 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

112 "CREATE SCHEMA public;" 

113 "CREATE SCHEMA logging;" 

114 "CREATE EXTENSION postgis;" 

115 "CREATE EXTENSION pg_trgm;" 

116 "CREATE EXTENSION btree_gist;" 

117 ) 

118 ) 

119 

120 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise 

121 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585" 

122 # and similar errors 

123 _get_base_engine().dispose() 

124 

125 close_all_sessions() 

126 

127 

128def create_schema_from_models(): 

129 """ 

130 Create everything from the current models, not incrementally 

131 through migrations. 

132 """ 

133 

134 # create the slugify function 

135 functions = Path(__file__).parent / "slugify.sql" 

136 with open(functions) as f, session_scope() as session: 

137 session.execute(text(f.read())) 

138 

139 Base.metadata.create_all(_get_base_engine()) 

140 

141 

142def populate_testing_resources(session): 

143 """ 

144 Testing version of couchers.resources.copy_resources_to_database 

145 """ 

146 regions = [ 

147 ("AUS", "Australia"), 

148 ("CAN", "Canada"), 

149 ("CHE", "Switzerland"), 

150 ("CUB", "Cuba"), 

151 ("CXR", "Christmas Island"), 

152 ("CZE", "Czechia"), 

153 ("DEU", "Germany"), 

154 ("EGY", "Egypt"), 

155 ("ESP", "Spain"), 

156 ("EST", "Estonia"), 

157 ("FIN", "Finland"), 

158 ("FRA", "France"), 

159 ("GBR", "United Kingdom"), 

160 ("GEO", "Georgia"), 

161 ("GHA", "Ghana"), 

162 ("GRC", "Greece"), 

163 ("HKG", "Hong Kong"), 

164 ("IRL", "Ireland"), 

165 ("ISR", "Israel"), 

166 ("ITA", "Italy"), 

167 ("JPN", "Japan"), 

168 ("LAO", "Laos"), 

169 ("MEX", "Mexico"), 

170 ("MMR", "Myanmar"), 

171 ("NAM", "Namibia"), 

172 ("NLD", "Netherlands"), 

173 ("NZL", "New Zealand"), 

174 ("POL", "Poland"), 

175 ("PRK", "North Korea"), 

176 ("REU", "Réunion"), 

177 ("SGP", "Singapore"), 

178 ("SWE", "Sweden"), 

179 ("THA", "Thailand"), 

180 ("TUR", "Turkey"), 

181 ("TWN", "Taiwan"), 

182 ("USA", "United States"), 

183 ("VNM", "Vietnam"), 

184 ] 

185 

186 languages = [ 

187 ("arb", "Arabic (Standard)"), 

188 ("deu", "German"), 

189 ("eng", "English"), 

190 ("fin", "Finnish"), 

191 ("fra", "French"), 

192 ("heb", "Hebrew"), 

193 ("hun", "Hungarian"), 

194 ("jpn", "Japanese"), 

195 ("pol", "Polish"), 

196 ("swe", "Swedish"), 

197 ("cmn", "Chinese (Mandarin)"), 

198 ] 

199 

200 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

201 tz_sql = f.read() 

202 

203 for code, name in regions: 

204 session.add(Region(code=code, name=name)) 

205 

206 for code, name in languages: 

207 session.add(Language(code=code, name=name)) 

208 

209 session.execute(text(tz_sql)) 

210 

211 

212def recreate_database(): 

213 """ 

214 Connect to a running Postgres database, build it using metadata.create_all() 

215 """ 

216 

217 # running in non-UTC catches some timezone errors 

218 os.environ["TZ"] = "America/New_York" 

219 

220 # drop everything currently in the database 

221 drop_all() 

222 

223 # create everything from the current models, not incrementally through migrations 

224 create_schema_from_models() 

225 

226 with session_scope() as session: 

227 populate_testing_resources(session) 

228 

229 

230@pytest.fixture() 

231def db(): 

232 """ 

233 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

234 """ 

235 

236 recreate_database() 

237 

238 

239def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs): 

240 """ 

241 Create a new user, return session token 

242 

243 The user is detached from any session, and you can access its static attributes, but you can't modify it 

244 

245 Use this most of the time 

246 """ 

247 auth = Auth() 

248 

249 with session_scope() as session: 

250 # default args 

251 username = "test_user_" + random_hex(16) 

252 user_opts = { 

253 "username": username, 

254 "email": f"{username}@dev.couchers.org", 

255 # password is just 'password' 

256 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

257 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

258 "name": username.capitalize(), 

259 "hosting_status": HostingStatus.cant_host, 

260 "meetup_status": MeetupStatus.open_to_meetup, 

261 "city": "Testing city", 

262 "hometown": "Test hometown", 

263 "community_standing": 0.5, 

264 "birthdate": date(year=2000, month=1, day=1), 

265 "gender": "Woman", 

266 "pronouns": "", 

267 "occupation": "Tester", 

268 "education": "UST(esting)", 

269 "about_me": "I test things", 

270 "things_i_like": "Code", 

271 "about_place": "My place has a lot of testing paraphenelia", 

272 "additional_information": "I can be a bit testy", 

273 # you need to make sure to update this logic to make sure the user is jailed/not on request 

274 "accepted_tos": TOS_VERSION, 

275 "accepted_community_guidelines": GUIDELINES_VERSION, 

276 "geom": create_coordinate(40.7108, -73.9740), 

277 "geom_radius": 100, 

278 "onboarding_emails_sent": 1, 

279 "last_onboarding_email_sent": now(), 

280 "has_donated": True, 

281 } 

282 

283 for key, value in kwargs.items(): 

284 user_opts[key] = value 

285 

286 user = User(**user_opts) 

287 session.add(user) 

288 session.flush() 

289 

290 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

291 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

292 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

293 

294 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

295 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

296 session.add(RegionLived(user_id=user.id, region_code="EST")) 

297 

298 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

299 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

300 

301 # this expires the user, so now it's "dirty" 

302 session.commit() 

303 

304 class _DummyContext: 

305 def invocation_metadata(self): 

306 return {} 

307 

308 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False) 

309 

310 # deleted user aborts session creation, hence this follows and necessitates a second commit 

311 if delete_user: 

312 user.is_deleted = True 

313 

314 user.recommendation_score = 1e10 - user.id 

315 

316 if complete_profile: 

317 key = random_hex(32) 

318 filename = random_hex(32) + ".jpg" 

319 session.add( 

320 Upload( 

321 key=key, 

322 filename=filename, 

323 creator_user_id=user.id, 

324 ) 

325 ) 

326 session.flush() 

327 user.avatar_key = key 

328 user.about_me = "I have a complete profile!\n" * 20 

329 

330 if strong_verification: 

331 attempt = StrongVerificationAttempt( 

332 verification_attempt_token=f"verification_attempt_token_{user.id}", 

333 user_id=user.id, 

334 status=StrongVerificationAttemptStatus.succeeded, 

335 has_full_data=True, 

336 passport_encrypted_data=b"not real", 

337 passport_date_of_birth=user.birthdate, 

338 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

339 user.gender, PassportSex.unspecified 

340 ), 

341 has_minimal_data=True, 

342 passport_expiry_date=date.today() + timedelta(days=10), 

343 passport_nationality="UTO", 

344 passport_last_three_document_chars=f"{user.id:03}", 

345 iris_token=f"iris_token_{user.id}", 

346 iris_session_id=user.id, 

347 ) 

348 session.add(attempt) 

349 session.flush() 

350 assert attempt.has_strong_verification(user) 

351 

352 session.commit() 

353 

354 assert user.has_completed_profile == complete_profile 

355 

356 # refresh it, undoes the expiry 

357 session.refresh(user) 

358 

359 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

360 user.timezone # noqa: B018 

361 

362 # allows detaches the user from the session, allowing its use outside this session 

363 session.expunge(user) 

364 

365 return user, token 

366 

367 

368def get_user_id_and_token(session, username): 

369 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

370 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

371 return user_id, token 

372 

373 

374def make_friends(user1, user2): 

375 with session_scope() as session: 

376 friend_relationship = FriendRelationship( 

377 from_user_id=user1.id, 

378 to_user_id=user2.id, 

379 status=FriendStatus.accepted, 

380 ) 

381 session.add(friend_relationship) 

382 

383 

384def make_user_block(user1, user2): 

385 with session_scope() as session: 

386 user_block = UserBlock( 

387 blocking_user_id=user1.id, 

388 blocked_user_id=user2.id, 

389 ) 

390 session.add(user_block) 

391 session.commit() 

392 

393 

394def make_user_invisible(user_id): 

395 with session_scope() as session: 

396 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

397 

398 

399# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

400def get_friend_relationship(user1, user2): 

401 with session_scope() as session: 

402 friend_relationship = session.execute( 

403 select(FriendRelationship).where( 

404 or_( 

405 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

406 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

407 ) 

408 ) 

409 ).scalar_one_or_none() 

410 

411 session.expunge(friend_relationship) 

412 return friend_relationship 

413 

414 

415class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

416 """ 

417 Injects the right `cookie: couchers-sesh=...` header into the metadata 

418 """ 

419 

420 def __init__(self, token): 

421 self.token = token 

422 

423 def __call__(self, context, callback): 

424 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

425 

426 

427@contextmanager 

428def auth_api_session(grpc_channel_options=()): 

429 """ 

430 Create an Auth API for testing 

431 

432 This needs to use the real server since it plays around with headers 

433 """ 

434 with futures.ThreadPoolExecutor(1) as executor: 

435 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

436 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

437 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

438 server.start() 

439 

440 try: 

441 with grpc.secure_channel( 

442 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

443 ) as channel: 

444 

445 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

446 def __init__(self): 

447 self.latest_headers = {} 

448 

449 def intercept_unary_unary(self, continuation, client_call_details, request): 

450 call = continuation(client_call_details, request) 

451 self.latest_headers = dict(call.initial_metadata()) 

452 self.latest_header_raw = call.initial_metadata() 

453 return call 

454 

455 metadata_interceptor = _MetadataKeeperInterceptor() 

456 channel = grpc.intercept_channel(channel, metadata_interceptor) 

457 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

458 finally: 

459 server.stop(None).wait() 

460 

461 

462@contextmanager 

463def api_session(token): 

464 """ 

465 Create an API for testing, uses the token for auth 

466 """ 

467 channel = fake_channel(token) 

468 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

469 yield api_pb2_grpc.APIStub(channel) 

470 

471 

472@contextmanager 

473def real_api_session(token): 

474 """ 

475 Create an API for testing, using TCP sockets, uses the token for auth 

476 """ 

477 with futures.ThreadPoolExecutor(1) as executor: 

478 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

479 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

480 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

481 server.start() 

482 

483 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

484 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

485 

486 try: 

487 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

488 yield api_pb2_grpc.APIStub(channel) 

489 finally: 

490 server.stop(None).wait() 

491 

492 

493@contextmanager 

494def real_admin_session(token): 

495 """ 

496 Create a Admin service for testing, using TCP sockets, uses the token for auth 

497 """ 

498 with futures.ThreadPoolExecutor(1) as executor: 

499 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

500 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

501 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

502 server.start() 

503 

504 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

505 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

506 

507 try: 

508 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

509 yield admin_pb2_grpc.AdminStub(channel) 

510 finally: 

511 server.stop(None).wait() 

512 

513 

514@contextmanager 

515def real_account_session(token): 

516 """ 

517 Create a Account service for testing, using TCP sockets, uses the token for auth 

518 """ 

519 with futures.ThreadPoolExecutor(1) as executor: 

520 server = grpc.server( 

521 executor, interceptors=[AuthValidatorInterceptor(), CookieInterceptor(), SessionInterceptor()] 

522 ) 

523 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

524 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

525 server.start() 

526 

527 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

528 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

529 

530 try: 

531 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

532 yield account_pb2_grpc.AccountStub(channel) 

533 finally: 

534 server.stop(None).wait() 

535 

536 

537@contextmanager 

538def real_jail_session(token): 

539 """ 

540 Create a Jail service for testing, using TCP sockets, uses the token for auth 

541 """ 

542 with futures.ThreadPoolExecutor(1) as executor: 

543 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

544 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

545 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

546 server.start() 

547 

548 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

549 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

550 

551 try: 

552 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

553 yield jail_pb2_grpc.JailStub(channel) 

554 finally: 

555 server.stop(None).wait() 

556 

557 

558@contextmanager 

559def gis_session(token): 

560 channel = fake_channel(token) 

561 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

562 yield gis_pb2_grpc.GISStub(channel) 

563 

564 

565class FakeRpcError(grpc.RpcError): 

566 def __init__(self, code, details): 

567 self._code = code 

568 self._details = details 

569 

570 def code(self): 

571 return self._code 

572 

573 def details(self): 

574 return self._details 

575 

576 

577def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry): 

578 # method is of the form "/org.couchers.api.core.API/GetUser" 

579 _, service_name, method_name = method.split("/") 

580 

581 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

582 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

583 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

584 assert auth_level in [ 

585 annotations_pb2.AUTH_LEVEL_OPEN, 

586 annotations_pb2.AUTH_LEVEL_JAILED, 

587 annotations_pb2.AUTH_LEVEL_SECURE, 

588 annotations_pb2.AUTH_LEVEL_ADMIN, 

589 ] 

590 

591 if not user_id: 

592 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

593 else: 

594 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

595 "Non-superuser tried to call superuser API" 

596 ) 

597 assert not ( 

598 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

599 ), "User is jailed but tried to call non-open/non-jailed API" 

600 

601 

602class FakeChannel: 

603 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None): 

604 self.handlers = {} 

605 self.user_id = user_id 

606 self._is_jailed = is_jailed 

607 self._is_superuser = is_superuser 

608 self._token_expiry = token_expiry 

609 

610 def abort(self, code, details): 

611 raise FakeRpcError(code, details) 

612 

613 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

614 from grpc._server import _validate_generic_rpc_handlers 

615 

616 _validate_generic_rpc_handlers(generic_rpc_handlers) 

617 

618 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

619 

620 def unary_unary(self, uri, request_serializer, response_deserializer): 

621 handler = self.handlers[uri] 

622 

623 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry) 

624 

625 def fake_handler(request): 

626 # Do a full serialization cycle on the request and the 

627 # response to catch accidental use of unserializable data. 

628 request = handler.request_deserializer(request_serializer(request)) 

629 

630 with session_scope() as session: 

631 response = handler.unary_unary(request, self, session) 

632 

633 return response_deserializer(handler.response_serializer(response)) 

634 

635 return fake_handler 

636 

637 

638def fake_channel(token=None): 

639 if token: 

640 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details( 

641 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

642 ) 

643 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry) 

644 return FakeChannel() 

645 

646 

647@contextmanager 

648def conversations_session(token): 

649 """ 

650 Create a Conversations API for testing, uses the token for auth 

651 """ 

652 channel = fake_channel(token) 

653 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

654 yield conversations_pb2_grpc.ConversationsStub(channel) 

655 

656 

657@contextmanager 

658def requests_session(token): 

659 """ 

660 Create a Requests API for testing, uses the token for auth 

661 """ 

662 channel = fake_channel(token) 

663 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

664 yield requests_pb2_grpc.RequestsStub(channel) 

665 

666 

667@contextmanager 

668def threads_session(token): 

669 channel = fake_channel(token) 

670 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

671 yield threads_pb2_grpc.ThreadsStub(channel) 

672 

673 

674@contextmanager 

675def discussions_session(token): 

676 channel = fake_channel(token) 

677 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

678 yield discussions_pb2_grpc.DiscussionsStub(channel) 

679 

680 

681@contextmanager 

682def donations_session(token): 

683 channel = fake_channel(token) 

684 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

685 yield donations_pb2_grpc.DonationsStub(channel) 

686 

687 

688@contextmanager 

689def real_stripe_session(): 

690 """ 

691 Create a Stripe service for testing, using TCP sockets 

692 """ 

693 with futures.ThreadPoolExecutor(1) as executor: 

694 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

695 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

696 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

697 server.start() 

698 

699 creds = grpc.local_channel_credentials() 

700 

701 try: 

702 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

703 yield stripe_pb2_grpc.StripeStub(channel) 

704 finally: 

705 server.stop(None).wait() 

706 

707 

708@contextmanager 

709def real_iris_session(): 

710 with futures.ThreadPoolExecutor(1) as executor: 

711 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

712 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

713 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

714 server.start() 

715 

716 creds = grpc.local_channel_credentials() 

717 

718 try: 

719 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

720 yield iris_pb2_grpc.IrisStub(channel) 

721 finally: 

722 server.stop(None).wait() 

723 

724 

725@contextmanager 

726def pages_session(token): 

727 channel = fake_channel(token) 

728 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

729 yield pages_pb2_grpc.PagesStub(channel) 

730 

731 

732@contextmanager 

733def communities_session(token): 

734 channel = fake_channel(token) 

735 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

736 yield communities_pb2_grpc.CommunitiesStub(channel) 

737 

738 

739@contextmanager 

740def groups_session(token): 

741 channel = fake_channel(token) 

742 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

743 yield groups_pb2_grpc.GroupsStub(channel) 

744 

745 

746@contextmanager 

747def blocking_session(token): 

748 channel = fake_channel(token) 

749 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

750 yield blocking_pb2_grpc.BlockingStub(channel) 

751 

752 

753@contextmanager 

754def notifications_session(token): 

755 channel = fake_channel(token) 

756 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

757 yield notifications_pb2_grpc.NotificationsStub(channel) 

758 

759 

760@contextmanager 

761def account_session(token): 

762 """ 

763 Create a Account API for testing, uses the token for auth 

764 """ 

765 channel = fake_channel(token) 

766 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

767 yield account_pb2_grpc.AccountStub(channel) 

768 

769 

770@contextmanager 

771def search_session(token): 

772 """ 

773 Create a Search API for testing, uses the token for auth 

774 """ 

775 channel = fake_channel(token) 

776 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

777 yield search_pb2_grpc.SearchStub(channel) 

778 

779 

780@contextmanager 

781def references_session(token): 

782 """ 

783 Create a References API for testing, uses the token for auth 

784 """ 

785 channel = fake_channel(token) 

786 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

787 yield references_pb2_grpc.ReferencesStub(channel) 

788 

789 

790@contextmanager 

791def reporting_session(token): 

792 channel = fake_channel(token) 

793 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

794 yield reporting_pb2_grpc.ReportingStub(channel) 

795 

796 

797@contextmanager 

798def events_session(token): 

799 channel = fake_channel(token) 

800 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

801 yield events_pb2_grpc.EventsStub(channel) 

802 

803 

804@contextmanager 

805def bugs_session(token=None): 

806 channel = fake_channel(token) 

807 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

808 yield bugs_pb2_grpc.BugsStub(channel) 

809 

810 

811@contextmanager 

812def resources_session(): 

813 channel = fake_channel() 

814 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

815 yield resources_pb2_grpc.ResourcesStub(channel) 

816 

817 

818@contextmanager 

819def media_session(bearer_token): 

820 """ 

821 Create a fresh Media API for testing, uses the bearer token for media auth 

822 """ 

823 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

824 

825 with futures.ThreadPoolExecutor(1) as executor: 

826 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()]) 

827 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

828 servicer = Media() 

829 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

830 server.start() 

831 

832 call_creds = grpc.access_token_call_credentials(bearer_token) 

833 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

834 

835 try: 

836 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

837 yield media_pb2_grpc.MediaStub(channel) 

838 finally: 

839 server.stop(None).wait() 

840 

841 

842@pytest.fixture(scope="class") 

843def testconfig(): 

844 prevconfig = config.copy() 

845 config.clear() 

846 config.update(prevconfig) 

847 

848 config["IN_TEST"] = True 

849 

850 config["DEV"] = True 

851 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

852 config["VERSION"] = "testing_version" 

853 config["BASE_URL"] = "http://localhost:3000" 

854 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

855 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

856 config["COOKIE_DOMAIN"] = "localhost" 

857 

858 config["ENABLE_SMS"] = False 

859 config["SMS_SENDER_ID"] = "invalid" 

860 

861 config["ENABLE_EMAIL"] = False 

862 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

863 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

864 config["NOTIFICATION_PREFIX"] = "[TEST] " 

865 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

866 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

867 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

868 

869 config["ENABLE_DONATIONS"] = False 

870 config["STRIPE_API_KEY"] = "" 

871 config["STRIPE_WEBHOOK_SECRET"] = "" 

872 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

873 

874 config["ENABLE_STRONG_VERIFICATION"] = False 

875 config["IRIS_ID_PUBKEY"] = "" 

876 config["IRIS_ID_SECRET"] = "" 

877 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

878 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

879 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

880 ) 

881 

882 config["SMTP_HOST"] = "localhost" 

883 config["SMTP_PORT"] = 587 

884 config["SMTP_USERNAME"] = "username" 

885 config["SMTP_PASSWORD"] = "password" 

886 

887 config["ENABLE_MEDIA"] = True 

888 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

889 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

890 ) 

891 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

892 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

893 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

894 

895 config["BUG_TOOL_ENABLED"] = False 

896 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

897 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

898 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

899 

900 config["LISTMONK_ENABLED"] = False 

901 config["LISTMONK_BASE_URL"] = "https://localhost" 

902 config["LISTMONK_API_USERNAME"] = "..." 

903 config["LISTMONK_API_KEY"] = "..." 

904 config["LISTMONK_LIST_ID"] = 3 

905 

906 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

907 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

908 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

909 

910 config["ACTIVENESS_PROBES_ENABLED"] = True 

911 

912 yield None 

913 

914 config.clear() 

915 config.update(prevconfig) 

916 

917 

918@pytest.fixture 

919def fast_passwords(): 

920 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

921 # make this fast by removing the hashing step 

922 

923 def fast_hash(password: bytes) -> bytes: 

924 return b"fake hash:" + password 

925 

926 def fast_verify(hashed: bytes, password: bytes) -> bool: 

927 return hashed == fast_hash(password) 

928 

929 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

930 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

931 yield 

932 

933 

934def process_jobs(): 

935 while process_job(): 

936 pass 

937 

938 

939@contextmanager 

940def mock_notification_email(): 

941 with patch("couchers.email._queue_email") as mock: 

942 yield mock 

943 process_jobs() 

944 

945 

946@dataclass 

947class EmailData: 

948 sender_name: str 

949 sender_email: str 

950 recipient: str 

951 subject: str 

952 plain: str 

953 html: str 

954 source_data: str 

955 list_unsubscribe_header: str 

956 

957 

958def email_fields(mock, call_ix=0): 

959 _, kw = mock.call_args_list[call_ix] 

960 return EmailData( 

961 sender_name=kw.get("sender_name"), 

962 sender_email=kw.get("sender_email"), 

963 recipient=kw.get("recipient"), 

964 subject=kw.get("subject"), 

965 plain=kw.get("plain"), 

966 html=kw.get("html"), 

967 source_data=kw.get("source_data"), 

968 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

969 ) 

970 

971 

972@pytest.fixture 

973def push_collector(): 

974 """ 

975 See test_SendTestPushNotification for an example on how to use this fixture 

976 """ 

977 

978 class Push: 

979 """ 

980 This allows nice access to the push info via e.g. push.title instead of push["title"] 

981 """ 

982 

983 def __init__(self, kwargs): 

984 self.kwargs = kwargs 

985 

986 def __getattr__(self, attr): 

987 try: 

988 return self.kwargs[attr] 

989 except KeyError: 

990 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

991 

992 def __repr__(self): 

993 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

994 return f"Push({kwargs_disp})" 

995 

996 class PushCollector: 

997 def __init__(self): 

998 # pairs of (user_id, push) 

999 self.pushes = [] 

1000 

1001 def by_user(self, user_id): 

1002 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

1003 

1004 def push_to_user(self, session, user_id, **kwargs): 

1005 self.pushes.append((user_id, Push(kwargs=kwargs))) 

1006 

1007 def assert_user_has_count(self, user_id, count): 

1008 assert len(self.by_user(user_id)) == count 

1009 

1010 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1011 push = self.by_user(user_id)[ix] 

1012 for kwarg in kwargs: 

1013 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1014 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1015 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1016 ) 

1017 

1018 def assert_user_has_single_matching(self, user_id, **kwargs): 

1019 self.assert_user_has_count(user_id, 1) 

1020 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1021 

1022 collector = PushCollector() 

1023 

1024 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1025 yield collector