Coverage for src/tests/test_fixtures.py: 99%

548 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 20:25 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date, timedelta 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.orm import close_all_sessions 

12from sqlalchemy.sql import or_, text 

13 

14from couchers.config import config 

15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

16from couchers.crypto import random_hex 

17from couchers.db import _get_base_engine, session_scope 

18from couchers.descriptor_pool import get_descriptor_pool 

19from couchers.interceptors import ( 

20 CouchersMiddlewareInterceptor, 

21 _try_get_and_update_user_details, 

22) 

23from couchers.jobs.worker import process_job 

24from couchers.models import ( 

25 Base, 

26 FriendRelationship, 

27 FriendStatus, 

28 HostingStatus, 

29 Language, 

30 LanguageAbility, 

31 LanguageFluency, 

32 MeetupStatus, 

33 ModerationUserList, 

34 PassportSex, 

35 Region, 

36 RegionLived, 

37 RegionVisited, 

38 StrongVerificationAttempt, 

39 StrongVerificationAttemptStatus, 

40 Upload, 

41 User, 

42 UserBlock, 

43 UserSession, 

44) 

45from couchers.servicers.account import Account, Iris 

46from couchers.servicers.admin import Admin 

47from couchers.servicers.api import API 

48from couchers.servicers.auth import Auth, create_session 

49from couchers.servicers.blocking import Blocking 

50from couchers.servicers.bugs import Bugs 

51from couchers.servicers.communities import Communities 

52from couchers.servicers.conversations import Conversations 

53from couchers.servicers.discussions import Discussions 

54from couchers.servicers.donations import Donations, Stripe 

55from couchers.servicers.events import Events 

56from couchers.servicers.gis import GIS 

57from couchers.servicers.groups import Groups 

58from couchers.servicers.jail import Jail 

59from couchers.servicers.media import Media, get_media_auth_interceptor 

60from couchers.servicers.notifications import Notifications 

61from couchers.servicers.pages import Pages 

62from couchers.servicers.public import Public 

63from couchers.servicers.references import References 

64from couchers.servicers.reporting import Reporting 

65from couchers.servicers.requests import Requests 

66from couchers.servicers.resources import Resources 

67from couchers.servicers.search import Search 

68from couchers.servicers.threads import Threads 

69from couchers.sql import couchers_select as select 

70from couchers.utils import create_coordinate, now 

71from proto import ( 

72 account_pb2_grpc, 

73 admin_pb2_grpc, 

74 annotations_pb2, 

75 api_pb2_grpc, 

76 auth_pb2_grpc, 

77 blocking_pb2_grpc, 

78 bugs_pb2_grpc, 

79 communities_pb2_grpc, 

80 conversations_pb2_grpc, 

81 discussions_pb2_grpc, 

82 donations_pb2_grpc, 

83 events_pb2_grpc, 

84 gis_pb2_grpc, 

85 groups_pb2_grpc, 

86 iris_pb2_grpc, 

87 jail_pb2_grpc, 

88 media_pb2_grpc, 

89 notifications_pb2_grpc, 

90 pages_pb2_grpc, 

91 public_pb2_grpc, 

92 references_pb2_grpc, 

93 reporting_pb2_grpc, 

94 requests_pb2_grpc, 

95 resources_pb2_grpc, 

96 search_pb2_grpc, 

97 stripe_pb2_grpc, 

98 threads_pb2_grpc, 

99) 

100 

101 

102def truncate_all_tables(): 

103 """drop everything currently in the database""" 

104 with session_scope() as session: 

105 for table in Base.metadata.tables.values(): 

106 if table.name in ("regions", "languages", "timezone_areas"): 

107 continue 

108 name = f'"{table.schema}"."{table.name}"' if table.schema else f'"{table.name}"' 

109 session.execute(text(f"TRUNCATE TABLE {name} RESTART IDENTITY CASCADE")) 

110 

111 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise 

112 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585" 

113 # and similar errors 

114 _get_base_engine().dispose() 

115 

116 close_all_sessions() 

117 

118 

119def create_schema_from_models(): 

120 """ 

121 Create everything from the current models, not incrementally 

122 through migrations. 

123 """ 

124 

125 # create sql functions (these are created in migrations otherwise) 

126 functions = Path(__file__).parent / "sql_functions.sql" 

127 with open(functions) as f, session_scope() as session: 

128 session.execute(text(f.read())) 

129 

130 Base.metadata.create_all(_get_base_engine()) 

131 

132 

133def populate_testing_resources(session): 

134 """ 

135 Testing version of couchers.resources.copy_resources_to_database 

136 """ 

137 regions = [ 

138 ("AUS", "Australia"), 

139 ("CAN", "Canada"), 

140 ("CHE", "Switzerland"), 

141 ("CUB", "Cuba"), 

142 ("CXR", "Christmas Island"), 

143 ("CZE", "Czechia"), 

144 ("DEU", "Germany"), 

145 ("EGY", "Egypt"), 

146 ("ESP", "Spain"), 

147 ("EST", "Estonia"), 

148 ("FIN", "Finland"), 

149 ("FRA", "France"), 

150 ("GBR", "United Kingdom"), 

151 ("GEO", "Georgia"), 

152 ("GHA", "Ghana"), 

153 ("GRC", "Greece"), 

154 ("HKG", "Hong Kong"), 

155 ("IRL", "Ireland"), 

156 ("ISR", "Israel"), 

157 ("ITA", "Italy"), 

158 ("JPN", "Japan"), 

159 ("LAO", "Laos"), 

160 ("MEX", "Mexico"), 

161 ("MMR", "Myanmar"), 

162 ("NAM", "Namibia"), 

163 ("NLD", "Netherlands"), 

164 ("NZL", "New Zealand"), 

165 ("POL", "Poland"), 

166 ("PRK", "North Korea"), 

167 ("REU", "Réunion"), 

168 ("SGP", "Singapore"), 

169 ("SWE", "Sweden"), 

170 ("THA", "Thailand"), 

171 ("TUR", "Turkey"), 

172 ("TWN", "Taiwan"), 

173 ("USA", "United States"), 

174 ("VNM", "Vietnam"), 

175 ] 

176 

177 languages = [ 

178 ("arb", "Arabic (Standard)"), 

179 ("deu", "German"), 

180 ("eng", "English"), 

181 ("fin", "Finnish"), 

182 ("fra", "French"), 

183 ("heb", "Hebrew"), 

184 ("hun", "Hungarian"), 

185 ("jpn", "Japanese"), 

186 ("pol", "Polish"), 

187 ("swe", "Swedish"), 

188 ("cmn", "Chinese (Mandarin)"), 

189 ] 

190 

191 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

192 tz_sql = f.read() 

193 

194 for code, name in regions: 

195 session.add(Region(code=code, name=name)) 

196 

197 for code, name in languages: 

198 session.add(Language(code=code, name=name)) 

199 

200 session.execute(text(tz_sql)) 

201 

202 

203def drop_database() -> None: 

204 with session_scope() as session: 

205 # postgis is required for all the Geographic Information System (GIS) stuff 

206 # pg_trgm is required for trigram-based search 

207 # btree_gist is required for gist-based exclusion constraints 

208 session.execute( 

209 text( 

210 "DROP SCHEMA IF EXISTS public CASCADE;" 

211 "DROP SCHEMA IF EXISTS logging CASCADE;" 

212 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

213 "CREATE SCHEMA public;" 

214 "CREATE SCHEMA logging;" 

215 "CREATE EXTENSION postgis;" 

216 "CREATE EXTENSION pg_trgm;" 

217 "CREATE EXTENSION btree_gist;" 

218 ) 

219 ) 

220 

221 

222def recreate_database(): 

223 # running in non-UTC catches some timezone errors 

224 os.environ["TZ"] = "America/New_York" 

225 

226 drop_database() 

227 

228 # create everything from the current models, not incrementally through migrations 

229 create_schema_from_models() 

230 

231 with session_scope() as session: 

232 populate_testing_resources(session) 

233 

234 

235@pytest.fixture(scope="session") 

236def create_database(): 

237 recreate_database() 

238 

239 

240@pytest.fixture 

241def db(create_database): 

242 """ 

243 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

244 """ 

245 truncate_all_tables() 

246 

247 

248def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs): 

249 """ 

250 Create a new user, return session token 

251 

252 The user is detached from any session, and you can access its static attributes, but you can't modify it 

253 

254 Use this most of the time 

255 """ 

256 auth = Auth() 

257 

258 with session_scope() as session: 

259 # default args 

260 username = "test_user_" + random_hex(16) 

261 user_opts = { 

262 "username": username, 

263 "email": f"{username}@dev.couchers.org", 

264 # password is just 'password' 

265 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

266 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

267 "name": username.capitalize(), 

268 "hosting_status": HostingStatus.cant_host, 

269 "meetup_status": MeetupStatus.open_to_meetup, 

270 "city": "Testing city", 

271 "hometown": "Test hometown", 

272 "community_standing": 0.5, 

273 "birthdate": date(year=2000, month=1, day=1), 

274 "gender": "Woman", 

275 "pronouns": "", 

276 "occupation": "Tester", 

277 "education": "UST(esting)", 

278 "about_me": "I test things", 

279 "things_i_like": "Code", 

280 "about_place": "My place has a lot of testing paraphenelia", 

281 "additional_information": "I can be a bit testy", 

282 # you need to make sure to update this logic to make sure the user is jailed/not on request 

283 "accepted_tos": TOS_VERSION, 

284 "accepted_community_guidelines": GUIDELINES_VERSION, 

285 "geom": create_coordinate(40.7108, -73.9740), 

286 "geom_radius": 100, 

287 "onboarding_emails_sent": 1, 

288 "last_onboarding_email_sent": now(), 

289 "has_donated": True, 

290 } 

291 

292 for key, value in kwargs.items(): 

293 user_opts[key] = value 

294 

295 user = User(**user_opts) 

296 session.add(user) 

297 session.flush() 

298 

299 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

300 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

301 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

302 

303 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

304 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

305 session.add(RegionLived(user_id=user.id, region_code="EST")) 

306 

307 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

308 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

309 

310 # this expires the user, so now it's "dirty" 

311 session.commit() 

312 

313 class _MockCouchersContext: 

314 @property 

315 def headers(self): 

316 return {} 

317 

318 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False) 

319 

320 # deleted user aborts session creation, hence this follows and necessitates a second commit 

321 if delete_user: 

322 user.is_deleted = True 

323 

324 user.recommendation_score = 1e10 - user.id 

325 

326 if complete_profile: 

327 key = random_hex(32) 

328 filename = random_hex(32) + ".jpg" 

329 session.add( 

330 Upload( 

331 key=key, 

332 filename=filename, 

333 creator_user_id=user.id, 

334 ) 

335 ) 

336 session.flush() 

337 user.avatar_key = key 

338 user.about_me = "I have a complete profile!\n" * 20 

339 

340 if strong_verification: 

341 attempt = StrongVerificationAttempt( 

342 verification_attempt_token=f"verification_attempt_token_{user.id}", 

343 user_id=user.id, 

344 status=StrongVerificationAttemptStatus.succeeded, 

345 has_full_data=True, 

346 passport_encrypted_data=b"not real", 

347 passport_date_of_birth=user.birthdate, 

348 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

349 user.gender, PassportSex.unspecified 

350 ), 

351 has_minimal_data=True, 

352 passport_expiry_date=date.today() + timedelta(days=10), 

353 passport_nationality="UTO", 

354 passport_last_three_document_chars=f"{user.id:03}", 

355 iris_token=f"iris_token_{user.id}", 

356 iris_session_id=user.id, 

357 ) 

358 session.add(attempt) 

359 session.flush() 

360 assert attempt.has_strong_verification(user) 

361 

362 session.commit() 

363 

364 assert user.has_completed_profile == complete_profile 

365 

366 # refresh it, undoes the expiry 

367 session.refresh(user) 

368 

369 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

370 user.timezone # noqa: B018 

371 

372 # allows detaches the user from the session, allowing its use outside this session 

373 session.expunge(user) 

374 

375 return user, token 

376 

377 

378def get_user_id_and_token(session, username): 

379 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

380 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

381 return user_id, token 

382 

383 

384def make_friends(user1, user2): 

385 with session_scope() as session: 

386 friend_relationship = FriendRelationship( 

387 from_user_id=user1.id, 

388 to_user_id=user2.id, 

389 status=FriendStatus.accepted, 

390 ) 

391 session.add(friend_relationship) 

392 

393 

394def make_user_block(user1, user2): 

395 with session_scope() as session: 

396 user_block = UserBlock( 

397 blocking_user_id=user1.id, 

398 blocked_user_id=user2.id, 

399 ) 

400 session.add(user_block) 

401 session.commit() 

402 

403 

404def make_user_invisible(user_id): 

405 with session_scope() as session: 

406 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

407 

408 

409# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

410def get_friend_relationship(user1, user2): 

411 with session_scope() as session: 

412 friend_relationship = session.execute( 

413 select(FriendRelationship).where( 

414 or_( 

415 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

416 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

417 ) 

418 ) 

419 ).scalar_one_or_none() 

420 

421 session.expunge(friend_relationship) 

422 return friend_relationship 

423 

424 

425def add_users_to_new_moderation_list(users): 

426 """Group users as duplicated accounts""" 

427 with session_scope() as session: 

428 moderation_user_list = ModerationUserList() 

429 session.add(moderation_user_list) 

430 session.flush() 

431 for user in users: 

432 refreshed_user = session.get(User, user.id) 

433 moderation_user_list.users.append(refreshed_user) 

434 return moderation_user_list.id 

435 

436 

437class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

438 """ 

439 Injects the right `cookie: couchers-sesh=...` header into the metadata 

440 """ 

441 

442 def __init__(self, token): 

443 self.token = token 

444 

445 def __call__(self, context, callback): 

446 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

447 

448 

449@contextmanager 

450def auth_api_session(grpc_channel_options=()): 

451 """ 

452 Create an Auth API for testing 

453 

454 This needs to use the real server since it plays around with headers 

455 """ 

456 with futures.ThreadPoolExecutor(1) as executor: 

457 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

458 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

459 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

460 server.start() 

461 

462 try: 

463 with grpc.secure_channel( 

464 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

465 ) as channel: 

466 

467 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

468 def __init__(self): 

469 self.latest_headers = {} 

470 

471 def intercept_unary_unary(self, continuation, client_call_details, request): 

472 call = continuation(client_call_details, request) 

473 self.latest_headers = dict(call.initial_metadata()) 

474 self.latest_header_raw = call.initial_metadata() 

475 return call 

476 

477 metadata_interceptor = _MetadataKeeperInterceptor() 

478 channel = grpc.intercept_channel(channel, metadata_interceptor) 

479 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

480 finally: 

481 server.stop(None).wait() 

482 

483 

484@contextmanager 

485def api_session(token): 

486 """ 

487 Create an API for testing, uses the token for auth 

488 """ 

489 channel = fake_channel(token) 

490 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

491 yield api_pb2_grpc.APIStub(channel) 

492 

493 

494@contextmanager 

495def real_api_session(token): 

496 """ 

497 Create an API for testing, using TCP sockets, uses the token for auth 

498 """ 

499 with futures.ThreadPoolExecutor(1) as executor: 

500 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

501 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

502 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

503 server.start() 

504 

505 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

506 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

507 

508 try: 

509 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

510 yield api_pb2_grpc.APIStub(channel) 

511 finally: 

512 server.stop(None).wait() 

513 

514 

515@contextmanager 

516def real_admin_session(token): 

517 """ 

518 Create a Admin service for testing, using TCP sockets, uses the token for auth 

519 """ 

520 with futures.ThreadPoolExecutor(1) as executor: 

521 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

522 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

523 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

524 server.start() 

525 

526 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

527 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

528 

529 try: 

530 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

531 yield admin_pb2_grpc.AdminStub(channel) 

532 finally: 

533 server.stop(None).wait() 

534 

535 

536@contextmanager 

537def real_account_session(token): 

538 """ 

539 Create a Account service for testing, using TCP sockets, uses the token for auth 

540 """ 

541 with futures.ThreadPoolExecutor(1) as executor: 

542 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

543 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

544 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

545 server.start() 

546 

547 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

548 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

549 

550 try: 

551 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

552 yield account_pb2_grpc.AccountStub(channel) 

553 finally: 

554 server.stop(None).wait() 

555 

556 

557@contextmanager 

558def real_jail_session(token): 

559 """ 

560 Create a Jail service for testing, using TCP sockets, uses the token for auth 

561 """ 

562 with futures.ThreadPoolExecutor(1) as executor: 

563 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

564 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

565 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

566 server.start() 

567 

568 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

569 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

570 

571 try: 

572 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

573 yield jail_pb2_grpc.JailStub(channel) 

574 finally: 

575 server.stop(None).wait() 

576 

577 

578@contextmanager 

579def gis_session(token): 

580 channel = fake_channel(token) 

581 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

582 yield gis_pb2_grpc.GISStub(channel) 

583 

584 

585@contextmanager 

586def public_session(): 

587 channel = fake_channel() 

588 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

589 yield public_pb2_grpc.PublicStub(channel) 

590 

591 

592class FakeRpcError(grpc.RpcError): 

593 def __init__(self, code, details): 

594 self._code = code 

595 self._details = details 

596 

597 def code(self): 

598 return self._code 

599 

600 def details(self): 

601 return self._details 

602 

603 

604def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry): 

605 # method is of the form "/org.couchers.api.core.API/GetUser" 

606 _, service_name, method_name = method.split("/") 

607 

608 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

609 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

610 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

611 assert auth_level in [ 

612 annotations_pb2.AUTH_LEVEL_OPEN, 

613 annotations_pb2.AUTH_LEVEL_JAILED, 

614 annotations_pb2.AUTH_LEVEL_SECURE, 

615 annotations_pb2.AUTH_LEVEL_ADMIN, 

616 ] 

617 

618 if not user_id: 

619 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

620 else: 

621 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

622 "Non-superuser tried to call superuser API" 

623 ) 

624 assert not ( 

625 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

626 ), "User is jailed but tried to call non-open/non-jailed API" 

627 

628 

629class FakeChannel: 

630 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None): 

631 self.handlers = {} 

632 self.user_id = user_id 

633 self._is_jailed = is_jailed 

634 self._is_superuser = is_superuser 

635 self._token_expiry = token_expiry 

636 

637 def is_logged_in(self): 

638 return self.user_id is not None 

639 

640 def abort(self, code, details): 

641 raise FakeRpcError(code, details) 

642 

643 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

644 from grpc._server import _validate_generic_rpc_handlers 

645 

646 _validate_generic_rpc_handlers(generic_rpc_handlers) 

647 

648 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

649 

650 def unary_unary(self, uri, request_serializer, response_deserializer): 

651 handler = self.handlers[uri] 

652 

653 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry) 

654 

655 def fake_handler(request): 

656 # Do a full serialization cycle on the request and the 

657 # response to catch accidental use of unserializable data. 

658 request = handler.request_deserializer(request_serializer(request)) 

659 

660 with session_scope() as session: 

661 response = handler.unary_unary(request, self, session) 

662 

663 return response_deserializer(handler.response_serializer(response)) 

664 

665 return fake_handler 

666 

667 

668def fake_channel(token=None): 

669 if token: 

670 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details( 

671 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

672 ) 

673 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry) 

674 return FakeChannel() 

675 

676 

677@contextmanager 

678def conversations_session(token): 

679 """ 

680 Create a Conversations API for testing, uses the token for auth 

681 """ 

682 channel = fake_channel(token) 

683 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

684 yield conversations_pb2_grpc.ConversationsStub(channel) 

685 

686 

687@contextmanager 

688def requests_session(token): 

689 """ 

690 Create a Requests API for testing, uses the token for auth 

691 """ 

692 channel = fake_channel(token) 

693 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

694 yield requests_pb2_grpc.RequestsStub(channel) 

695 

696 

697@contextmanager 

698def threads_session(token): 

699 channel = fake_channel(token) 

700 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

701 yield threads_pb2_grpc.ThreadsStub(channel) 

702 

703 

704@contextmanager 

705def discussions_session(token): 

706 channel = fake_channel(token) 

707 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

708 yield discussions_pb2_grpc.DiscussionsStub(channel) 

709 

710 

711@contextmanager 

712def donations_session(token): 

713 channel = fake_channel(token) 

714 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

715 yield donations_pb2_grpc.DonationsStub(channel) 

716 

717 

718@contextmanager 

719def real_stripe_session(): 

720 """ 

721 Create a Stripe service for testing, using TCP sockets 

722 """ 

723 with futures.ThreadPoolExecutor(1) as executor: 

724 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

725 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

726 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

727 server.start() 

728 

729 creds = grpc.local_channel_credentials() 

730 

731 try: 

732 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

733 yield stripe_pb2_grpc.StripeStub(channel) 

734 finally: 

735 server.stop(None).wait() 

736 

737 

738@contextmanager 

739def real_iris_session(): 

740 with futures.ThreadPoolExecutor(1) as executor: 

741 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

742 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

743 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

744 server.start() 

745 

746 creds = grpc.local_channel_credentials() 

747 

748 try: 

749 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

750 yield iris_pb2_grpc.IrisStub(channel) 

751 finally: 

752 server.stop(None).wait() 

753 

754 

755@contextmanager 

756def pages_session(token): 

757 channel = fake_channel(token) 

758 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

759 yield pages_pb2_grpc.PagesStub(channel) 

760 

761 

762@contextmanager 

763def communities_session(token): 

764 channel = fake_channel(token) 

765 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

766 yield communities_pb2_grpc.CommunitiesStub(channel) 

767 

768 

769@contextmanager 

770def groups_session(token): 

771 channel = fake_channel(token) 

772 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

773 yield groups_pb2_grpc.GroupsStub(channel) 

774 

775 

776@contextmanager 

777def blocking_session(token): 

778 channel = fake_channel(token) 

779 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

780 yield blocking_pb2_grpc.BlockingStub(channel) 

781 

782 

783@contextmanager 

784def notifications_session(token): 

785 channel = fake_channel(token) 

786 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

787 yield notifications_pb2_grpc.NotificationsStub(channel) 

788 

789 

790@contextmanager 

791def account_session(token): 

792 """ 

793 Create a Account API for testing, uses the token for auth 

794 """ 

795 channel = fake_channel(token) 

796 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

797 yield account_pb2_grpc.AccountStub(channel) 

798 

799 

800@contextmanager 

801def search_session(token): 

802 """ 

803 Create a Search API for testing, uses the token for auth 

804 """ 

805 channel = fake_channel(token) 

806 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

807 yield search_pb2_grpc.SearchStub(channel) 

808 

809 

810@contextmanager 

811def references_session(token): 

812 """ 

813 Create a References API for testing, uses the token for auth 

814 """ 

815 channel = fake_channel(token) 

816 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

817 yield references_pb2_grpc.ReferencesStub(channel) 

818 

819 

820@contextmanager 

821def reporting_session(token): 

822 channel = fake_channel(token) 

823 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

824 yield reporting_pb2_grpc.ReportingStub(channel) 

825 

826 

827@contextmanager 

828def events_session(token): 

829 channel = fake_channel(token) 

830 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

831 yield events_pb2_grpc.EventsStub(channel) 

832 

833 

834@contextmanager 

835def bugs_session(token=None): 

836 channel = fake_channel(token) 

837 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

838 yield bugs_pb2_grpc.BugsStub(channel) 

839 

840 

841@contextmanager 

842def resources_session(): 

843 channel = fake_channel() 

844 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

845 yield resources_pb2_grpc.ResourcesStub(channel) 

846 

847 

848@contextmanager 

849def media_session(bearer_token): 

850 """ 

851 Create a fresh Media API for testing, uses the bearer token for media auth 

852 """ 

853 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

854 

855 with futures.ThreadPoolExecutor(1) as executor: 

856 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

857 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

858 servicer = Media() 

859 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

860 server.start() 

861 

862 call_creds = grpc.access_token_call_credentials(bearer_token) 

863 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

864 

865 try: 

866 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

867 yield media_pb2_grpc.MediaStub(channel) 

868 finally: 

869 server.stop(None).wait() 

870 

871 

872@pytest.fixture(scope="class") 

873def testconfig(): 

874 prevconfig = config.copy() 

875 config.clear() 

876 config.update(prevconfig) 

877 

878 config["IN_TEST"] = True 

879 

880 config["DEV"] = True 

881 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

882 config["VERSION"] = "testing_version" 

883 config["BASE_URL"] = "http://localhost:3000" 

884 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

885 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

886 config["COOKIE_DOMAIN"] = "localhost" 

887 

888 config["ENABLE_SMS"] = False 

889 config["SMS_SENDER_ID"] = "invalid" 

890 

891 config["ENABLE_EMAIL"] = False 

892 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

893 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

894 config["NOTIFICATION_PREFIX"] = "[TEST] " 

895 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

896 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

897 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

898 

899 config["ENABLE_DONATIONS"] = False 

900 config["STRIPE_API_KEY"] = "" 

901 config["STRIPE_WEBHOOK_SECRET"] = "" 

902 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

903 

904 config["ENABLE_STRONG_VERIFICATION"] = False 

905 config["IRIS_ID_PUBKEY"] = "" 

906 config["IRIS_ID_SECRET"] = "" 

907 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

908 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

909 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

910 ) 

911 

912 config["SMTP_HOST"] = "localhost" 

913 config["SMTP_PORT"] = 587 

914 config["SMTP_USERNAME"] = "username" 

915 config["SMTP_PASSWORD"] = "password" 

916 

917 config["ENABLE_MEDIA"] = True 

918 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

919 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

920 ) 

921 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

922 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

923 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

924 

925 config["BUG_TOOL_ENABLED"] = False 

926 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

927 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

928 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

929 

930 config["LISTMONK_ENABLED"] = False 

931 config["LISTMONK_BASE_URL"] = "https://localhost" 

932 config["LISTMONK_API_USERNAME"] = "..." 

933 config["LISTMONK_API_KEY"] = "..." 

934 config["LISTMONK_LIST_ID"] = 3 

935 

936 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

937 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

938 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

939 

940 config["ACTIVENESS_PROBES_ENABLED"] = True 

941 

942 config["RECAPTHCA_ENABLED"] = False 

943 config["RECAPTHCA_PROJECT_ID"] = "..." 

944 config["RECAPTHCA_API_KEY"] = "..." 

945 config["RECAPTHCA_SITE_KEY"] = "..." 

946 

947 yield None 

948 

949 config.clear() 

950 config.update(prevconfig) 

951 

952 

953def run_migration_test(): 

954 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true" 

955 

956 

957@pytest.fixture 

958def fast_passwords(): 

959 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

960 # make this fast by removing the hashing step 

961 

962 def fast_hash(password: bytes) -> bytes: 

963 return b"fake hash:" + password 

964 

965 def fast_verify(hashed: bytes, password: bytes) -> bool: 

966 return hashed == fast_hash(password) 

967 

968 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

969 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

970 yield 

971 

972 

973def process_jobs(): 

974 while process_job(): 

975 pass 

976 

977 

978@contextmanager 

979def mock_notification_email(): 

980 with patch("couchers.email._queue_email") as mock: 

981 yield mock 

982 process_jobs() 

983 

984 

985@dataclass 

986class EmailData: 

987 sender_name: str 

988 sender_email: str 

989 recipient: str 

990 subject: str 

991 plain: str 

992 html: str 

993 source_data: str 

994 list_unsubscribe_header: str 

995 

996 

997def email_fields(mock, call_ix=0): 

998 _, kw = mock.call_args_list[call_ix] 

999 return EmailData( 

1000 sender_name=kw.get("sender_name"), 

1001 sender_email=kw.get("sender_email"), 

1002 recipient=kw.get("recipient"), 

1003 subject=kw.get("subject"), 

1004 plain=kw.get("plain"), 

1005 html=kw.get("html"), 

1006 source_data=kw.get("source_data"), 

1007 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

1008 ) 

1009 

1010 

1011@pytest.fixture 

1012def push_collector(): 

1013 """ 

1014 See test_SendTestPushNotification for an example on how to use this fixture 

1015 """ 

1016 

1017 class Push: 

1018 """ 

1019 This allows nice access to the push info via e.g. push.title instead of push["title"] 

1020 """ 

1021 

1022 def __init__(self, kwargs): 

1023 self.kwargs = kwargs 

1024 

1025 def __getattr__(self, attr): 

1026 try: 

1027 return self.kwargs[attr] 

1028 except KeyError: 

1029 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

1030 

1031 def __repr__(self): 

1032 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

1033 return f"Push({kwargs_disp})" 

1034 

1035 class PushCollector: 

1036 def __init__(self): 

1037 # pairs of (user_id, push) 

1038 self.pushes = [] 

1039 

1040 def by_user(self, user_id): 

1041 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

1042 

1043 def push_to_user(self, session, user_id, **kwargs): 

1044 self.pushes.append((user_id, Push(kwargs=kwargs))) 

1045 

1046 def assert_user_has_count(self, user_id, count): 

1047 assert len(self.by_user(user_id)) == count 

1048 

1049 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1050 push = self.by_user(user_id)[ix] 

1051 for kwarg in kwargs: 

1052 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1053 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1054 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1055 ) 

1056 

1057 def assert_user_has_single_matching(self, user_id, **kwargs): 

1058 self.assert_user_has_count(user_id, 1) 

1059 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1060 

1061 collector = PushCollector() 

1062 

1063 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1064 yield collector