Coverage for src/tests/test_fixtures.py: 98%

513 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-03-11 15:27 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date, timedelta 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.orm import close_all_sessions 

12from sqlalchemy.sql import or_, text 

13 

14from couchers.config import config 

15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

16from couchers.crypto import random_hex 

17from couchers.db import _get_base_engine, session_scope 

18from couchers.descriptor_pool import get_descriptor_pool 

19from couchers.interceptors import AuthValidatorInterceptor, SessionInterceptor, _try_get_and_update_user_details 

20from couchers.jobs.worker import process_job 

21from couchers.models import ( 

22 Base, 

23 FriendRelationship, 

24 FriendStatus, 

25 HostingStatus, 

26 Language, 

27 LanguageAbility, 

28 LanguageFluency, 

29 MeetupStatus, 

30 PassportSex, 

31 Region, 

32 RegionLived, 

33 RegionVisited, 

34 StrongVerificationAttempt, 

35 StrongVerificationAttemptStatus, 

36 Upload, 

37 User, 

38 UserBlock, 

39 UserSession, 

40) 

41from couchers.servicers.account import Account, Iris 

42from couchers.servicers.admin import Admin 

43from couchers.servicers.api import API 

44from couchers.servicers.auth import Auth, create_session 

45from couchers.servicers.blocking import Blocking 

46from couchers.servicers.bugs import Bugs 

47from couchers.servicers.communities import Communities 

48from couchers.servicers.conversations import Conversations 

49from couchers.servicers.discussions import Discussions 

50from couchers.servicers.donations import Donations, Stripe 

51from couchers.servicers.events import Events 

52from couchers.servicers.gis import GIS 

53from couchers.servicers.groups import Groups 

54from couchers.servicers.jail import Jail 

55from couchers.servicers.media import Media, get_media_auth_interceptor 

56from couchers.servicers.notifications import Notifications 

57from couchers.servicers.pages import Pages 

58from couchers.servicers.references import References 

59from couchers.servicers.reporting import Reporting 

60from couchers.servicers.requests import Requests 

61from couchers.servicers.resources import Resources 

62from couchers.servicers.search import Search 

63from couchers.servicers.threads import Threads 

64from couchers.sql import couchers_select as select 

65from couchers.utils import create_coordinate, now 

66from proto import ( 

67 account_pb2_grpc, 

68 admin_pb2_grpc, 

69 annotations_pb2, 

70 api_pb2_grpc, 

71 auth_pb2_grpc, 

72 blocking_pb2_grpc, 

73 bugs_pb2_grpc, 

74 communities_pb2_grpc, 

75 conversations_pb2_grpc, 

76 discussions_pb2_grpc, 

77 donations_pb2_grpc, 

78 events_pb2_grpc, 

79 gis_pb2_grpc, 

80 groups_pb2_grpc, 

81 iris_pb2_grpc, 

82 jail_pb2_grpc, 

83 media_pb2_grpc, 

84 notifications_pb2_grpc, 

85 pages_pb2_grpc, 

86 references_pb2_grpc, 

87 reporting_pb2_grpc, 

88 requests_pb2_grpc, 

89 resources_pb2_grpc, 

90 search_pb2_grpc, 

91 stripe_pb2_grpc, 

92 threads_pb2_grpc, 

93) 

94 

95 

96def drop_all(): 

97 """drop everything currently in the database""" 

98 with session_scope() as session: 

99 # postgis is required for all the Geographic Information System (GIS) stuff 

100 # pg_trgm is required for trigram based search 

101 # btree_gist is required for gist-based exclusion constraints 

102 session.execute( 

103 text( 

104 "DROP SCHEMA IF EXISTS public CASCADE;" 

105 "DROP SCHEMA IF EXISTS logging CASCADE;" 

106 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

107 "CREATE SCHEMA public;" 

108 "CREATE SCHEMA logging;" 

109 "CREATE EXTENSION postgis;" 

110 "CREATE EXTENSION pg_trgm;" 

111 "CREATE EXTENSION btree_gist;" 

112 ) 

113 ) 

114 

115 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise 

116 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585" 

117 # and similar errors 

118 _get_base_engine().dispose() 

119 

120 close_all_sessions() 

121 

122 

123def create_schema_from_models(): 

124 """ 

125 Create everything from the current models, not incrementally 

126 through migrations. 

127 """ 

128 

129 # create the slugify function 

130 functions = Path(__file__).parent / "slugify.sql" 

131 with open(functions) as f, session_scope() as session: 

132 session.execute(text(f.read())) 

133 

134 Base.metadata.create_all(_get_base_engine()) 

135 

136 

137def populate_testing_resources(session): 

138 """ 

139 Testing version of couchers.resources.copy_resources_to_database 

140 """ 

141 regions = [ 

142 ("AUS", "Australia"), 

143 ("CAN", "Canada"), 

144 ("CHE", "Switzerland"), 

145 ("CUB", "Cuba"), 

146 ("CXR", "Christmas Island"), 

147 ("CZE", "Czechia"), 

148 ("DEU", "Germany"), 

149 ("EGY", "Egypt"), 

150 ("ESP", "Spain"), 

151 ("EST", "Estonia"), 

152 ("FIN", "Finland"), 

153 ("FRA", "France"), 

154 ("GBR", "United Kingdom"), 

155 ("GEO", "Georgia"), 

156 ("GHA", "Ghana"), 

157 ("GRC", "Greece"), 

158 ("HKG", "Hong Kong"), 

159 ("IRL", "Ireland"), 

160 ("ISR", "Israel"), 

161 ("ITA", "Italy"), 

162 ("JPN", "Japan"), 

163 ("LAO", "Laos"), 

164 ("MEX", "Mexico"), 

165 ("MMR", "Myanmar"), 

166 ("NAM", "Namibia"), 

167 ("NLD", "Netherlands"), 

168 ("NZL", "New Zealand"), 

169 ("POL", "Poland"), 

170 ("PRK", "North Korea"), 

171 ("REU", "Réunion"), 

172 ("SGP", "Singapore"), 

173 ("SWE", "Sweden"), 

174 ("THA", "Thailand"), 

175 ("TUR", "Turkey"), 

176 ("TWN", "Taiwan"), 

177 ("USA", "United States"), 

178 ("VNM", "Vietnam"), 

179 ] 

180 

181 languages = [ 

182 ("arb", "Arabic (Standard)"), 

183 ("deu", "German"), 

184 ("eng", "English"), 

185 ("fin", "Finnish"), 

186 ("fra", "French"), 

187 ("heb", "Hebrew"), 

188 ("hun", "Hungarian"), 

189 ("jpn", "Japanese"), 

190 ("pol", "Polish"), 

191 ("swe", "Swedish"), 

192 ("cmn", "Chinese (Mandarin)"), 

193 ] 

194 

195 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

196 tz_sql = f.read() 

197 

198 for code, name in regions: 

199 session.add(Region(code=code, name=name)) 

200 

201 for code, name in languages: 

202 session.add(Language(code=code, name=name)) 

203 

204 session.execute(text(tz_sql)) 

205 

206 

207def recreate_database(): 

208 """ 

209 Connect to a running Postgres database, build it using metadata.create_all() 

210 """ 

211 

212 # running in non-UTC catches some timezone errors 

213 os.environ["TZ"] = "America/New_York" 

214 

215 # drop everything currently in the database 

216 drop_all() 

217 

218 # create everything from the current models, not incrementally through migrations 

219 create_schema_from_models() 

220 

221 with session_scope() as session: 

222 populate_testing_resources(session) 

223 

224 

225@pytest.fixture() 

226def db(): 

227 """ 

228 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

229 """ 

230 

231 recreate_database() 

232 

233 

234def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs): 

235 """ 

236 Create a new user, return session token 

237 

238 The user is detached from any session, and you can access its static attributes, but you can't modify it 

239 

240 Use this most of the time 

241 """ 

242 auth = Auth() 

243 

244 with session_scope() as session: 

245 # default args 

246 username = "test_user_" + random_hex(16) 

247 user_opts = { 

248 "username": username, 

249 "email": f"{username}@dev.couchers.org", 

250 # password is just 'password' 

251 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

252 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

253 "name": username.capitalize(), 

254 "hosting_status": HostingStatus.cant_host, 

255 "meetup_status": MeetupStatus.open_to_meetup, 

256 "city": "Testing city", 

257 "hometown": "Test hometown", 

258 "community_standing": 0.5, 

259 "birthdate": date(year=2000, month=1, day=1), 

260 "gender": "Woman", 

261 "pronouns": "", 

262 "occupation": "Tester", 

263 "education": "UST(esting)", 

264 "about_me": "I test things", 

265 "things_i_like": "Code", 

266 "about_place": "My place has a lot of testing paraphenelia", 

267 "additional_information": "I can be a bit testy", 

268 # you need to make sure to update this logic to make sure the user is jailed/not on request 

269 "accepted_tos": TOS_VERSION, 

270 "accepted_community_guidelines": GUIDELINES_VERSION, 

271 "geom": create_coordinate(40.7108, -73.9740), 

272 "geom_radius": 100, 

273 "onboarding_emails_sent": 1, 

274 "last_onboarding_email_sent": now(), 

275 "has_donated": True, 

276 } 

277 

278 for key, value in kwargs.items(): 

279 user_opts[key] = value 

280 

281 user = User(**user_opts) 

282 session.add(user) 

283 session.flush() 

284 

285 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

286 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

287 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

288 

289 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

290 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

291 session.add(RegionLived(user_id=user.id, region_code="EST")) 

292 

293 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

294 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

295 

296 # this expires the user, so now it's "dirty" 

297 session.commit() 

298 

299 class _DummyContext: 

300 def invocation_metadata(self): 

301 return {} 

302 

303 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False) 

304 

305 # deleted user aborts session creation, hence this follows and necessitates a second commit 

306 if delete_user: 

307 user.is_deleted = True 

308 

309 user.recommendation_score = 1e10 - user.id 

310 

311 if complete_profile: 

312 key = random_hex(32) 

313 filename = random_hex(32) + ".jpg" 

314 session.add( 

315 Upload( 

316 key=key, 

317 filename=filename, 

318 creator_user_id=user.id, 

319 ) 

320 ) 

321 session.flush() 

322 user.avatar_key = key 

323 user.about_me = "I have a complete profile!\n" * 20 

324 

325 if strong_verification: 

326 attempt = StrongVerificationAttempt( 

327 verification_attempt_token=f"verification_attempt_token_{user.id}", 

328 user_id=user.id, 

329 status=StrongVerificationAttemptStatus.succeeded, 

330 has_full_data=True, 

331 passport_encrypted_data=b"not real", 

332 passport_date_of_birth=user.birthdate, 

333 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

334 user.gender, PassportSex.unspecified 

335 ), 

336 has_minimal_data=True, 

337 passport_expiry_date=date.today() + timedelta(days=10), 

338 passport_nationality="UTO", 

339 passport_last_three_document_chars=f"{user.id:03}", 

340 iris_token=f"iris_token_{user.id}", 

341 iris_session_id=user.id, 

342 ) 

343 session.add(attempt) 

344 session.flush() 

345 assert attempt.has_strong_verification(user) 

346 

347 session.commit() 

348 

349 assert user.has_completed_profile == complete_profile 

350 

351 # refresh it, undoes the expiry 

352 session.refresh(user) 

353 

354 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

355 user.timezone # noqa: B018 

356 

357 # allows detaches the user from the session, allowing its use outside this session 

358 session.expunge(user) 

359 

360 return user, token 

361 

362 

363def get_user_id_and_token(session, username): 

364 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

365 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

366 return user_id, token 

367 

368 

369def make_friends(user1, user2): 

370 with session_scope() as session: 

371 friend_relationship = FriendRelationship( 

372 from_user_id=user1.id, 

373 to_user_id=user2.id, 

374 status=FriendStatus.accepted, 

375 ) 

376 session.add(friend_relationship) 

377 

378 

379def make_user_block(user1, user2): 

380 with session_scope() as session: 

381 user_block = UserBlock( 

382 blocking_user_id=user1.id, 

383 blocked_user_id=user2.id, 

384 ) 

385 session.add(user_block) 

386 session.commit() 

387 

388 

389def make_user_invisible(user_id): 

390 with session_scope() as session: 

391 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

392 

393 

394# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

395def get_friend_relationship(user1, user2): 

396 with session_scope() as session: 

397 friend_relationship = session.execute( 

398 select(FriendRelationship).where( 

399 or_( 

400 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

401 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

402 ) 

403 ) 

404 ).scalar_one_or_none() 

405 

406 session.expunge(friend_relationship) 

407 return friend_relationship 

408 

409 

410class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

411 """ 

412 Injects the right `cookie: couchers-sesh=...` header into the metadata 

413 """ 

414 

415 def __init__(self, token): 

416 self.token = token 

417 

418 def __call__(self, context, callback): 

419 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

420 

421 

422@contextmanager 

423def auth_api_session(grpc_channel_options=()): 

424 """ 

425 Create an Auth API for testing 

426 

427 This needs to use the real server since it plays around with headers 

428 """ 

429 with futures.ThreadPoolExecutor(1) as executor: 

430 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

431 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

432 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

433 server.start() 

434 

435 try: 

436 with grpc.secure_channel( 

437 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

438 ) as channel: 

439 

440 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

441 def __init__(self): 

442 self.latest_headers = {} 

443 

444 def intercept_unary_unary(self, continuation, client_call_details, request): 

445 call = continuation(client_call_details, request) 

446 self.latest_headers = dict(call.initial_metadata()) 

447 self.latest_header_raw = call.initial_metadata() 

448 return call 

449 

450 metadata_interceptor = _MetadataKeeperInterceptor() 

451 channel = grpc.intercept_channel(channel, metadata_interceptor) 

452 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

453 finally: 

454 server.stop(None).wait() 

455 

456 

457@contextmanager 

458def api_session(token): 

459 """ 

460 Create an API for testing, uses the token for auth 

461 """ 

462 channel = fake_channel(token) 

463 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

464 yield api_pb2_grpc.APIStub(channel) 

465 

466 

467@contextmanager 

468def real_api_session(token): 

469 """ 

470 Create an API for testing, using TCP sockets, uses the token for auth 

471 """ 

472 with futures.ThreadPoolExecutor(1) as executor: 

473 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

474 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

475 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

476 server.start() 

477 

478 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

479 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

480 

481 try: 

482 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

483 yield api_pb2_grpc.APIStub(channel) 

484 finally: 

485 server.stop(None).wait() 

486 

487 

488@contextmanager 

489def real_admin_session(token): 

490 """ 

491 Create a Admin service for testing, using TCP sockets, uses the token for auth 

492 """ 

493 with futures.ThreadPoolExecutor(1) as executor: 

494 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

495 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

496 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

497 server.start() 

498 

499 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

500 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

501 

502 try: 

503 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

504 yield admin_pb2_grpc.AdminStub(channel) 

505 finally: 

506 server.stop(None).wait() 

507 

508 

509@contextmanager 

510def real_account_session(token): 

511 """ 

512 Create a Account service for testing, using TCP sockets, uses the token for auth 

513 """ 

514 with futures.ThreadPoolExecutor(1) as executor: 

515 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

516 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

517 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

518 server.start() 

519 

520 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

521 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

522 

523 try: 

524 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

525 yield account_pb2_grpc.AccountStub(channel) 

526 finally: 

527 server.stop(None).wait() 

528 

529 

530@contextmanager 

531def real_jail_session(token): 

532 """ 

533 Create a Jail service for testing, using TCP sockets, uses the token for auth 

534 """ 

535 with futures.ThreadPoolExecutor(1) as executor: 

536 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

537 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

538 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

539 server.start() 

540 

541 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

542 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

543 

544 try: 

545 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

546 yield jail_pb2_grpc.JailStub(channel) 

547 finally: 

548 server.stop(None).wait() 

549 

550 

551@contextmanager 

552def gis_session(token): 

553 channel = fake_channel(token) 

554 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

555 yield gis_pb2_grpc.GISStub(channel) 

556 

557 

558class FakeRpcError(grpc.RpcError): 

559 def __init__(self, code, details): 

560 self._code = code 

561 self._details = details 

562 

563 def code(self): 

564 return self._code 

565 

566 def details(self): 

567 return self._details 

568 

569 

570def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry): 

571 # method is of the form "/org.couchers.api.core.API/GetUser" 

572 _, service_name, method_name = method.split("/") 

573 

574 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

575 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

576 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

577 assert auth_level in [ 

578 annotations_pb2.AUTH_LEVEL_OPEN, 

579 annotations_pb2.AUTH_LEVEL_JAILED, 

580 annotations_pb2.AUTH_LEVEL_SECURE, 

581 annotations_pb2.AUTH_LEVEL_ADMIN, 

582 ] 

583 

584 if not user_id: 

585 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

586 else: 

587 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

588 "Non-superuser tried to call superuser API" 

589 ) 

590 assert not ( 

591 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

592 ), "User is jailed but tried to call non-open/non-jailed API" 

593 

594 

595class FakeChannel: 

596 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None): 

597 self.handlers = {} 

598 self.user_id = user_id 

599 self._is_jailed = is_jailed 

600 self._is_superuser = is_superuser 

601 self._token_expiry = token_expiry 

602 

603 def abort(self, code, details): 

604 raise FakeRpcError(code, details) 

605 

606 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

607 from grpc._server import _validate_generic_rpc_handlers 

608 

609 _validate_generic_rpc_handlers(generic_rpc_handlers) 

610 

611 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

612 

613 def unary_unary(self, uri, request_serializer, response_deserializer): 

614 handler = self.handlers[uri] 

615 

616 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry) 

617 

618 def fake_handler(request): 

619 # Do a full serialization cycle on the request and the 

620 # response to catch accidental use of unserializable data. 

621 request = handler.request_deserializer(request_serializer(request)) 

622 

623 with session_scope() as session: 

624 response = handler.unary_unary(request, self, session) 

625 

626 return response_deserializer(handler.response_serializer(response)) 

627 

628 return fake_handler 

629 

630 

631def fake_channel(token=None): 

632 if token: 

633 user_id, is_jailed, is_superuser, token_expiry = _try_get_and_update_user_details( 

634 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

635 ) 

636 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry) 

637 return FakeChannel() 

638 

639 

640@contextmanager 

641def conversations_session(token): 

642 """ 

643 Create a Conversations API for testing, uses the token for auth 

644 """ 

645 channel = fake_channel(token) 

646 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

647 yield conversations_pb2_grpc.ConversationsStub(channel) 

648 

649 

650@contextmanager 

651def requests_session(token): 

652 """ 

653 Create a Requests API for testing, uses the token for auth 

654 """ 

655 channel = fake_channel(token) 

656 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

657 yield requests_pb2_grpc.RequestsStub(channel) 

658 

659 

660@contextmanager 

661def threads_session(token): 

662 channel = fake_channel(token) 

663 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

664 yield threads_pb2_grpc.ThreadsStub(channel) 

665 

666 

667@contextmanager 

668def discussions_session(token): 

669 channel = fake_channel(token) 

670 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

671 yield discussions_pb2_grpc.DiscussionsStub(channel) 

672 

673 

674@contextmanager 

675def donations_session(token): 

676 channel = fake_channel(token) 

677 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

678 yield donations_pb2_grpc.DonationsStub(channel) 

679 

680 

681@contextmanager 

682def real_stripe_session(): 

683 """ 

684 Create a Stripe service for testing, using TCP sockets 

685 """ 

686 with futures.ThreadPoolExecutor(1) as executor: 

687 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

688 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

689 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

690 server.start() 

691 

692 creds = grpc.local_channel_credentials() 

693 

694 try: 

695 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

696 yield stripe_pb2_grpc.StripeStub(channel) 

697 finally: 

698 server.stop(None).wait() 

699 

700 

701@contextmanager 

702def real_iris_session(): 

703 with futures.ThreadPoolExecutor(1) as executor: 

704 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()]) 

705 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

706 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

707 server.start() 

708 

709 creds = grpc.local_channel_credentials() 

710 

711 try: 

712 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

713 yield iris_pb2_grpc.IrisStub(channel) 

714 finally: 

715 server.stop(None).wait() 

716 

717 

718@contextmanager 

719def pages_session(token): 

720 channel = fake_channel(token) 

721 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

722 yield pages_pb2_grpc.PagesStub(channel) 

723 

724 

725@contextmanager 

726def communities_session(token): 

727 channel = fake_channel(token) 

728 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

729 yield communities_pb2_grpc.CommunitiesStub(channel) 

730 

731 

732@contextmanager 

733def groups_session(token): 

734 channel = fake_channel(token) 

735 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

736 yield groups_pb2_grpc.GroupsStub(channel) 

737 

738 

739@contextmanager 

740def blocking_session(token): 

741 channel = fake_channel(token) 

742 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

743 yield blocking_pb2_grpc.BlockingStub(channel) 

744 

745 

746@contextmanager 

747def notifications_session(token): 

748 channel = fake_channel(token) 

749 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

750 yield notifications_pb2_grpc.NotificationsStub(channel) 

751 

752 

753@contextmanager 

754def account_session(token): 

755 """ 

756 Create a Account API for testing, uses the token for auth 

757 """ 

758 channel = fake_channel(token) 

759 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

760 yield account_pb2_grpc.AccountStub(channel) 

761 

762 

763@contextmanager 

764def search_session(token): 

765 """ 

766 Create a Search API for testing, uses the token for auth 

767 """ 

768 channel = fake_channel(token) 

769 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

770 yield search_pb2_grpc.SearchStub(channel) 

771 

772 

773@contextmanager 

774def references_session(token): 

775 """ 

776 Create a References API for testing, uses the token for auth 

777 """ 

778 channel = fake_channel(token) 

779 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

780 yield references_pb2_grpc.ReferencesStub(channel) 

781 

782 

783@contextmanager 

784def reporting_session(token): 

785 channel = fake_channel(token) 

786 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

787 yield reporting_pb2_grpc.ReportingStub(channel) 

788 

789 

790@contextmanager 

791def events_session(token): 

792 channel = fake_channel(token) 

793 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

794 yield events_pb2_grpc.EventsStub(channel) 

795 

796 

797@contextmanager 

798def bugs_session(token=None): 

799 channel = fake_channel(token) 

800 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

801 yield bugs_pb2_grpc.BugsStub(channel) 

802 

803 

804@contextmanager 

805def resources_session(): 

806 channel = fake_channel() 

807 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

808 yield resources_pb2_grpc.ResourcesStub(channel) 

809 

810 

811@contextmanager 

812def media_session(bearer_token): 

813 """ 

814 Create a fresh Media API for testing, uses the bearer token for media auth 

815 """ 

816 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

817 

818 with futures.ThreadPoolExecutor(1) as executor: 

819 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()]) 

820 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

821 servicer = Media() 

822 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

823 server.start() 

824 

825 call_creds = grpc.access_token_call_credentials(bearer_token) 

826 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

827 

828 try: 

829 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

830 yield media_pb2_grpc.MediaStub(channel) 

831 finally: 

832 server.stop(None).wait() 

833 

834 

835@pytest.fixture(scope="class") 

836def testconfig(): 

837 prevconfig = config.copy() 

838 config.clear() 

839 config.update(prevconfig) 

840 

841 config["IN_TEST"] = True 

842 

843 config["DEV"] = True 

844 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

845 config["VERSION"] = "testing_version" 

846 config["BASE_URL"] = "http://localhost:3000" 

847 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

848 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

849 config["COOKIE_DOMAIN"] = "localhost" 

850 

851 config["ENABLE_SMS"] = False 

852 config["SMS_SENDER_ID"] = "invalid" 

853 

854 config["ENABLE_EMAIL"] = False 

855 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

856 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

857 config["NOTIFICATION_PREFIX"] = "[TEST] " 

858 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

859 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

860 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

861 

862 config["ENABLE_DONATIONS"] = False 

863 config["STRIPE_API_KEY"] = "" 

864 config["STRIPE_WEBHOOK_SECRET"] = "" 

865 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

866 

867 config["ENABLE_STRONG_VERIFICATION"] = False 

868 config["IRIS_ID_PUBKEY"] = "" 

869 config["IRIS_ID_SECRET"] = "" 

870 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

871 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

872 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

873 ) 

874 

875 config["SMTP_HOST"] = "localhost" 

876 config["SMTP_PORT"] = 587 

877 config["SMTP_USERNAME"] = "username" 

878 config["SMTP_PASSWORD"] = "password" 

879 

880 config["ENABLE_MEDIA"] = True 

881 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

882 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

883 ) 

884 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

885 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

886 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

887 

888 config["BUG_TOOL_ENABLED"] = False 

889 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

890 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

891 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

892 

893 config["LISTMONK_ENABLED"] = False 

894 config["LISTMONK_BASE_URL"] = "https://localhost" 

895 config["LISTMONK_API_USERNAME"] = "..." 

896 config["LISTMONK_API_KEY"] = "..." 

897 config["LISTMONK_LIST_ID"] = 3 

898 

899 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

900 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

901 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

902 

903 yield None 

904 

905 config.clear() 

906 config.update(prevconfig) 

907 

908 

909@pytest.fixture 

910def fast_passwords(): 

911 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

912 # make this fast by removing the hashing step 

913 

914 def fast_hash(password: bytes) -> bytes: 

915 return b"fake hash:" + password 

916 

917 def fast_verify(hashed: bytes, password: bytes) -> bool: 

918 return hashed == fast_hash(password) 

919 

920 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

921 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

922 yield 

923 

924 

925def process_jobs(): 

926 while process_job(): 

927 pass 

928 

929 

930@contextmanager 

931def mock_notification_email(): 

932 with patch("couchers.email._queue_email") as mock: 

933 yield mock 

934 process_jobs() 

935 

936 

937@dataclass 

938class EmailData: 

939 sender_name: str 

940 sender_email: str 

941 recipient: str 

942 subject: str 

943 plain: str 

944 html: str 

945 source_data: str 

946 list_unsubscribe_header: str 

947 

948 

949def email_fields(mock, call_ix=0): 

950 _, kw = mock.call_args_list[call_ix] 

951 return EmailData( 

952 sender_name=kw.get("sender_name"), 

953 sender_email=kw.get("sender_email"), 

954 recipient=kw.get("recipient"), 

955 subject=kw.get("subject"), 

956 plain=kw.get("plain"), 

957 html=kw.get("html"), 

958 source_data=kw.get("source_data"), 

959 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

960 ) 

961 

962 

963@pytest.fixture 

964def push_collector(): 

965 """ 

966 See test_SendTestPushNotification for an example on how to use this fixture 

967 """ 

968 

969 class Push: 

970 """ 

971 This allows nice access to the push info via e.g. push.title instead of push["title"] 

972 """ 

973 

974 def __init__(self, kwargs): 

975 self.kwargs = kwargs 

976 

977 def __getattr__(self, attr): 

978 try: 

979 return self.kwargs[attr] 

980 except KeyError: 

981 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

982 

983 def __repr__(self): 

984 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

985 return f"Push({kwargs_disp})" 

986 

987 class PushCollector: 

988 def __init__(self): 

989 # pairs of (user_id, push) 

990 self.pushes = [] 

991 

992 def by_user(self, user_id): 

993 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

994 

995 def push_to_user(self, session, user_id, **kwargs): 

996 self.pushes.append((user_id, Push(kwargs=kwargs))) 

997 

998 def assert_user_has_count(self, user_id, count): 

999 assert len(self.by_user(user_id)) == count 

1000 

1001 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1002 push = self.by_user(user_id)[ix] 

1003 for kwarg in kwargs: 

1004 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1005 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1006 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1007 ) 

1008 

1009 def assert_user_has_single_matching(self, user_id, **kwargs): 

1010 self.assert_user_has_count(user_id, 1) 

1011 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1012 

1013 collector = PushCollector() 

1014 

1015 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1016 yield collector