Coverage for src/tests/test_fixtures.py: 98%

636 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import os 

2import re 

3from collections.abc import Generator, Sequence 

4from concurrent import futures 

5from contextlib import contextmanager 

6from dataclasses import dataclass 

7from datetime import date, timedelta 

8from pathlib import Path 

9from typing import Any 

10from unittest.mock import patch 

11 

12import grpc 

13import pytest 

14from grpc._server import _validate_generic_rpc_handlers 

15from sqlalchemy import Connection, Engine, create_engine, update 

16from sqlalchemy.orm import Session 

17from sqlalchemy.sql import or_, text 

18 

19from couchers.config import config 

20from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

21from couchers.context import make_interactive_context 

22from couchers.crypto import random_hex 

23from couchers.db import _get_base_engine, session_scope 

24from couchers.descriptor_pool import get_descriptor_pool 

25from couchers.interceptors import ( 

26 CouchersMiddlewareInterceptor, 

27 UserAuthInfo, 

28 _try_get_and_update_user_details, 

29) 

30from couchers.jobs.worker import process_job 

31from couchers.models import ( 

32 Base, 

33 FriendRelationship, 

34 FriendStatus, 

35 HostingStatus, 

36 LanguageAbility, 

37 LanguageFluency, 

38 MeetupStatus, 

39 ModerationUserList, 

40 PassportSex, 

41 PhotoGallery, 

42 RegionLived, 

43 RegionVisited, 

44 StrongVerificationAttempt, 

45 StrongVerificationAttemptStatus, 

46 Upload, 

47 User, 

48 UserBlock, 

49 UserSession, 

50) 

51from couchers.proto import ( 

52 account_pb2_grpc, 

53 admin_pb2_grpc, 

54 annotations_pb2, 

55 api_pb2_grpc, 

56 auth_pb2_grpc, 

57 blocking_pb2_grpc, 

58 bugs_pb2_grpc, 

59 communities_pb2_grpc, 

60 conversations_pb2_grpc, 

61 discussions_pb2_grpc, 

62 donations_pb2_grpc, 

63 editor_pb2_grpc, 

64 events_pb2_grpc, 

65 galleries_pb2_grpc, 

66 gis_pb2_grpc, 

67 groups_pb2_grpc, 

68 iris_pb2_grpc, 

69 jail_pb2_grpc, 

70 media_pb2_grpc, 

71 moderation_pb2, 

72 moderation_pb2_grpc, 

73 notifications_pb2_grpc, 

74 pages_pb2_grpc, 

75 postal_verification_pb2_grpc, 

76 public_pb2_grpc, 

77 references_pb2_grpc, 

78 reporting_pb2_grpc, 

79 requests_pb2_grpc, 

80 resources_pb2_grpc, 

81 search_pb2_grpc, 

82 stripe_pb2_grpc, 

83 threads_pb2_grpc, 

84) 

85from couchers.servicers.account import Account, Iris 

86from couchers.servicers.admin import Admin 

87from couchers.servicers.api import API 

88from couchers.servicers.auth import Auth, create_session 

89from couchers.servicers.blocking import Blocking 

90from couchers.servicers.bugs import Bugs 

91from couchers.servicers.communities import Communities 

92from couchers.servicers.conversations import Conversations 

93from couchers.servicers.discussions import Discussions 

94from couchers.servicers.donations import Donations, Stripe 

95from couchers.servicers.editor import Editor 

96from couchers.servicers.events import Events 

97from couchers.servicers.galleries import Galleries 

98from couchers.servicers.gis import GIS 

99from couchers.servicers.groups import Groups 

100from couchers.servicers.jail import Jail 

101from couchers.servicers.media import Media, get_media_auth_interceptor 

102from couchers.servicers.moderation import Moderation 

103from couchers.servicers.notifications import Notifications 

104from couchers.servicers.pages import Pages 

105from couchers.servicers.postal_verification import PostalVerification 

106from couchers.servicers.public import Public 

107from couchers.servicers.references import References 

108from couchers.servicers.reporting import Reporting 

109from couchers.servicers.requests import Requests 

110from couchers.servicers.resources import Resources 

111from couchers.servicers.search import Search 

112from couchers.servicers.threads import Threads 

113from couchers.sql import couchers_select as select 

114from couchers.utils import create_coordinate, now 

115 

116 

117def create_schema_from_models(engine: Engine | None = None) -> None: 

118 """ 

119 Create everything from the current models, not incrementally 

120 through migrations. 

121 """ 

122 if engine is None: 

123 engine = _get_base_engine() 

124 

125 # create sql functions (these are created in migrations otherwise) 

126 functions = Path(__file__).parent / "sql_functions.sql" 

127 with open(functions) as f, engine.connect() as conn: 

128 conn.execute(text(f.read())) 

129 conn.commit() 

130 

131 Base.metadata.create_all(engine) 

132 

133 

134def populate_testing_resources(conn: Connection) -> None: 

135 """ 

136 Testing version of couchers.resources.copy_resources_to_database 

137 """ 

138 conn.execute( 

139 text(""" 

140 INSERT INTO regions (code, name) VALUES 

141 ('AUS', 'Australia'), 

142 ('CAN', 'Canada'), 

143 ('CHE', 'Switzerland'), 

144 ('CUB', 'Cuba'), 

145 ('CXR', 'Christmas Island'), 

146 ('CZE', 'Czechia'), 

147 ('DEU', 'Germany'), 

148 ('EGY', 'Egypt'), 

149 ('ESP', 'Spain'), 

150 ('EST', 'Estonia'), 

151 ('FIN', 'Finland'), 

152 ('FRA', 'France'), 

153 ('GBR', 'United Kingdom'), 

154 ('GEO', 'Georgia'), 

155 ('GHA', 'Ghana'), 

156 ('GRC', 'Greece'), 

157 ('HKG', 'Hong Kong'), 

158 ('IRL', 'Ireland'), 

159 ('ISR', 'Israel'), 

160 ('ITA', 'Italy'), 

161 ('JPN', 'Japan'), 

162 ('LAO', 'Laos'), 

163 ('MEX', 'Mexico'), 

164 ('MMR', 'Myanmar'), 

165 ('NAM', 'Namibia'), 

166 ('NLD', 'Netherlands'), 

167 ('NZL', 'New Zealand'), 

168 ('POL', 'Poland'), 

169 ('PRK', 'North Korea'), 

170 ('REU', 'Réunion'), 

171 ('SGP', 'Singapore'), 

172 ('SWE', 'Sweden'), 

173 ('THA', 'Thailand'), 

174 ('TUR', 'Turkey'), 

175 ('TWN', 'Taiwan'), 

176 ('USA', 'United States'), 

177 ('VNM', 'Vietnam'); 

178 """) 

179 ) 

180 

181 # Insert languages as textual SQL 

182 conn.execute( 

183 text(""" 

184 INSERT INTO languages (code, name) VALUES 

185 ('arb', 'Arabic (Standard)'), 

186 ('deu', 'German'), 

187 ('eng', 'English'), 

188 ('fin', 'Finnish'), 

189 ('fra', 'French'), 

190 ('heb', 'Hebrew'), 

191 ('hun', 'Hungarian'), 

192 ('jpn', 'Japanese'), 

193 ('pol', 'Polish'), 

194 ('swe', 'Swedish'), 

195 ('cmn', 'Chinese (Mandarin)') 

196 """) 

197 ) 

198 

199 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

200 tz_sql = f.read() 

201 

202 conn.execute(text(tz_sql)) 

203 

204 

205def drop_database() -> None: 

206 with session_scope() as session: 

207 # postgis is required for all the Geographic Information System (GIS) stuff 

208 # pg_trgm is required for trigram-based search 

209 # btree_gist is required for gist-based exclusion constraints 

210 session.execute( 

211 text( 

212 "DROP SCHEMA IF EXISTS public CASCADE;" 

213 "DROP SCHEMA IF EXISTS logging CASCADE;" 

214 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

215 "CREATE SCHEMA IF NOT EXISTS public;" 

216 "CREATE SCHEMA IF NOT EXISTS logging;" 

217 "CREATE EXTENSION postgis;" 

218 "CREATE EXTENSION pg_trgm;" 

219 "CREATE EXTENSION btree_gist;" 

220 ) 

221 ) 

222 

223 

224@contextmanager 

225def autocommit_engine(url: str): 

226 """ 

227 An engine that executes every statement in a transaction. Mainly needed 

228 because CREATE/DROP DATABASE cannot be executed any other way. 

229 """ 

230 engine = create_engine( 

231 url, 

232 isolation_level="AUTOCOMMIT", 

233 ) 

234 yield engine 

235 engine.dispose() 

236 

237 

238@pytest.fixture(scope="session") 

239def postgres_engine() -> Generator[Engine]: 

240 """ 

241 SQLAlchemy engine connected to "postgres" database. 

242 """ 

243 dsn = config["DATABASE_CONNECTION_STRING"] 

244 if not dsn.endswith("/testdb"): 

245 raise RuntimeError(f"DATABASE_CONNECTION_STRING must point to /testdb, but was {dsn}") 

246 

247 postgres_dsn = re.sub(r"/testdb$", "/postgres", dsn) 

248 

249 with autocommit_engine(postgres_dsn) as engine: 

250 yield engine 

251 

252 

253@pytest.fixture(scope="session") 

254def postgres_conn(postgres_engine: Engine) -> Generator[Connection]: 

255 """ 

256 Acquiring a connection takes time, so we cache it. 

257 """ 

258 with postgres_engine.connect() as conn: 

259 yield conn 

260 

261 

262@pytest.fixture(scope="session") 

263def template_db(postgres_conn: Connection) -> str: 

264 """ 

265 Creates a template database with all the extensions, tables, 

266 and static data (languages, regions.) This is done only once: then 

267 we copy this template for every test. It's much faster than creating 

268 a database without a template or deleting data from all tables between 

269 tests. The tables are created from SQLA metadata, not by running the 

270 migrations - again, for speed. 

271 """ 

272 # running in non-UTC catches some timezone errors 

273 os.environ["TZ"] = "America/New_York" 

274 

275 name = "couchers_template" 

276 

277 postgres_conn.execute(text(f"DROP DATABASE IF EXISTS {name}")) 

278 postgres_conn.execute(text(f"CREATE DATABASE {name}")) 

279 

280 template_dsn = re.sub( 

281 r"/testdb$", 

282 f"/{name}", 

283 config["DATABASE_CONNECTION_STRING"], 

284 ) 

285 

286 with autocommit_engine(template_dsn) as engine: 

287 with engine.connect() as conn: 

288 conn.execute( 

289 text( 

290 "CREATE SCHEMA logging;" 

291 "CREATE EXTENSION IF NOT EXISTS postgis;" 

292 "CREATE EXTENSION IF NOT EXISTS pg_trgm;" 

293 "CREATE EXTENSION IF NOT EXISTS btree_gist;" 

294 ) 

295 ) 

296 

297 create_schema_from_models(engine) 

298 populate_testing_resources(conn) 

299 

300 return name 

301 

302 

303@pytest.fixture 

304def db(template_db: str, postgres_conn: Connection) -> None: 

305 """ 

306 Creates a fresh database for a test by copying a template. The template has 

307 the migrations applied and is populated with static data (regions, languages, etc.) 

308 """ 

309 postgres_conn.execute(text("DROP DATABASE IF EXISTS testdb WITH (FORCE)")) 

310 postgres_conn.execute(text(f"CREATE DATABASE testdb WITH TEMPLATE {template_db}")) 

311 

312 

313@pytest.fixture(scope="class") 

314def db_class(template_db: str, postgres_conn: Connection) -> None: 

315 """ 

316 The same as above, but with a different scope. Used in test_communities.py. 

317 """ 

318 postgres_conn.execute(text("DROP DATABASE IF EXISTS testdb WITH (FORCE)")) 

319 postgres_conn.execute(text(f"CREATE DATABASE testdb WITH TEMPLATE {template_db}")) 

320 

321 

322class _MockCouchersContext: 

323 @property 

324 def headers(self): 

325 return {} 

326 

327 

328def generate_user( 

329 *, 

330 delete_user=False, 

331 complete_profile=True, 

332 strong_verification=False, 

333 regions_visited: Sequence[str] = (), 

334 regions_lived: Sequence[str] = (), 

335 language_abilities: Sequence[tuple[str, LanguageFluency]] = (), 

336 **kwargs: Any, 

337) -> tuple[User, str]: 

338 """ 

339 Create a new user, return session token 

340 

341 The user is detached from any session, and you can access its static attributes, but you can't modify it 

342 

343 Use this most of the time 

344 """ 

345 with session_scope() as session: 

346 # Ensure superusers are also editors (DB constraint) 

347 if kwargs.get("is_superuser") and "is_editor" not in kwargs: 

348 kwargs["is_editor"] = True 

349 

350 # default args 

351 username = "test_user_" + random_hex(16) 

352 user_opts = { 

353 "username": username, 

354 "email": f"{username}@dev.couchers.org", 

355 # password is just 'password' 

356 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

357 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

358 "name": username.capitalize(), 

359 "hosting_status": HostingStatus.cant_host, 

360 "meetup_status": MeetupStatus.open_to_meetup, 

361 "city": "Testing city", 

362 "hometown": "Test hometown", 

363 "community_standing": 0.5, 

364 "birthdate": date(year=2000, month=1, day=1), 

365 "gender": "Woman", 

366 "pronouns": "", 

367 "occupation": "Tester", 

368 "education": "UST(esting)", 

369 "about_me": "I test things", 

370 "things_i_like": "Code", 

371 "about_place": "My place has a lot of testing paraphenelia", 

372 "additional_information": "I can be a bit testy", 

373 # you need to make sure to update this logic to make sure the user is jailed/not on request 

374 "accepted_tos": TOS_VERSION, 

375 "accepted_community_guidelines": GUIDELINES_VERSION, 

376 "geom": create_coordinate(40.7108, -73.9740), 

377 "geom_radius": 100, 

378 "onboarding_emails_sent": 1, 

379 "last_onboarding_email_sent": now(), 

380 "last_donated": now(), 

381 } | kwargs 

382 

383 user = User(**user_opts) 

384 session.add(user) 

385 session.flush() 

386 

387 # Create a profile gallery for the user and link it 

388 profile_gallery = PhotoGallery(owner_user_id=user.id) 

389 session.add(profile_gallery) 

390 session.flush() 

391 user.profile_gallery_id = profile_gallery.id 

392 

393 for region in regions_visited: 

394 session.add(RegionVisited(user_id=user.id, region_code=region)) 

395 

396 for region in regions_lived: 

397 session.add(RegionLived(user_id=user.id, region_code=region)) 

398 

399 for lang, fluency in language_abilities: 

400 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency)) 

401 

402 # this expires the user, so now it's "dirty" 

403 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False) 

404 

405 # deleted user aborts session creation, hence this follows and necessitates a second commit 

406 if delete_user: 

407 user.is_deleted = True 

408 

409 user.recommendation_score = 1e10 - user.id 

410 

411 if complete_profile: 

412 key = random_hex(32) 

413 filename = random_hex(32) + ".jpg" 

414 session.add( 

415 Upload( 

416 key=key, 

417 filename=filename, 

418 creator_user_id=user.id, 

419 ) 

420 ) 

421 session.flush() 

422 user.avatar_key = key 

423 user.about_me = "I have a complete profile!\n" * 20 

424 

425 if strong_verification: 

426 attempt = StrongVerificationAttempt( 

427 verification_attempt_token=f"verification_attempt_token_{user.id}", 

428 user_id=user.id, 

429 status=StrongVerificationAttemptStatus.succeeded, 

430 has_full_data=True, 

431 passport_encrypted_data=b"not real", 

432 passport_date_of_birth=user.birthdate, 

433 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

434 user.gender, PassportSex.unspecified 

435 ), 

436 has_minimal_data=True, 

437 passport_expiry_date=date.today() + timedelta(days=10), 

438 passport_nationality="UTO", 

439 passport_last_three_document_chars=f"{user.id:03}", 

440 iris_token=f"iris_token_{user.id}", 

441 iris_session_id=user.id, 

442 ) 

443 session.add(attempt) 

444 session.flush() 

445 assert attempt.has_strong_verification(user) 

446 

447 session.commit() 

448 

449 assert user.has_completed_profile == complete_profile 

450 

451 # refresh it, undoes the expiry 

452 session.refresh(user) 

453 

454 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

455 user.timezone # noqa: B018 

456 

457 # allows detaches the user from the session, allowing its use outside this session 

458 session.expunge(user) 

459 

460 return user, token 

461 

462 

463def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]: 

464 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

465 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

466 return user_id, token 

467 

468 

469def make_friends(user1: User, user2: User) -> None: 

470 with session_scope() as session: 

471 friend_relationship = FriendRelationship( 

472 from_user_id=user1.id, 

473 to_user_id=user2.id, 

474 status=FriendStatus.accepted, 

475 ) 

476 session.add(friend_relationship) 

477 

478 

479def make_user_block(user1: User, user2: User) -> None: 

480 with session_scope() as session: 

481 user_block = UserBlock( 

482 blocking_user_id=user1.id, 

483 blocked_user_id=user2.id, 

484 ) 

485 session.add(user_block) 

486 

487 

488def make_user_invisible(user_id: int) -> None: 

489 with session_scope() as session: 

490 session.execute(update(User).where(User.id == user_id).values(is_banned=True)) 

491 

492 

493# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

494def get_friend_relationship(user1: User, user2: User) -> FriendRelationship: 

495 with session_scope() as session: 

496 friend_relationship = session.execute( 

497 select(FriendRelationship).where( 

498 or_( 

499 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

500 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

501 ) 

502 ) 

503 ).scalar_one_or_none() 

504 

505 session.expunge(friend_relationship) 

506 return friend_relationship 

507 

508 

509def add_users_to_new_moderation_list(users: list[User]) -> int: 

510 """Group users as duplicated accounts""" 

511 with session_scope() as session: 

512 moderation_user_list = ModerationUserList() 

513 session.add(moderation_user_list) 

514 session.flush() 

515 for user in users: 

516 refreshed_user = session.get(User, user.id) 

517 moderation_user_list.users.append(refreshed_user) 

518 return moderation_user_list.id 

519 

520 

521class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

522 """ 

523 Injects the right `cookie: couchers-sesh=...` header into the metadata 

524 """ 

525 

526 def __init__(self, token: str): 

527 self.token = token 

528 

529 def __call__(self, context, callback) -> None: 

530 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

531 

532 

533class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

534 def __init__(self): 

535 self.latest_headers = {} 

536 

537 def intercept_unary_unary(self, continuation, client_call_details, request): 

538 call = continuation(client_call_details, request) 

539 self.latest_headers = dict(call.initial_metadata()) 

540 self.latest_header_raw = call.initial_metadata() 

541 return call 

542 

543 

544@contextmanager 

545def auth_api_session( 

546 grpc_channel_options=(), 

547) -> Generator[tuple[auth_pb2_grpc.AuthStub, grpc.UnaryUnaryClientInterceptor]]: 

548 """ 

549 Create an Auth API for testing 

550 

551 This needs to use the real server since it plays around with headers 

552 """ 

553 with futures.ThreadPoolExecutor(1) as executor: 

554 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

555 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

556 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

557 server.start() 

558 

559 try: 

560 with grpc.secure_channel( 

561 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

562 ) as channel: 

563 metadata_interceptor = _MetadataKeeperInterceptor() 

564 channel = grpc.intercept_channel(channel, metadata_interceptor) 

565 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

566 finally: 

567 server.stop(None).wait() 

568 

569 

570@contextmanager 

571def api_session(token): 

572 """ 

573 Create an API for testing, uses the token for auth 

574 """ 

575 channel = FakeChannel(token) 

576 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

577 yield api_pb2_grpc.APIStub(channel) 

578 

579 

580@contextmanager 

581def real_api_session(token): 

582 """ 

583 Create an API for testing, using TCP sockets, uses the token for auth 

584 """ 

585 with futures.ThreadPoolExecutor(1) as executor: 

586 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

587 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

588 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

589 server.start() 

590 

591 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

592 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

593 

594 try: 

595 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

596 yield api_pb2_grpc.APIStub(channel) 

597 finally: 

598 server.stop(None).wait() 

599 

600 

601@contextmanager 

602def real_admin_session(token): 

603 """ 

604 Create a Admin service for testing, using TCP sockets, uses the token for auth 

605 """ 

606 with futures.ThreadPoolExecutor(1) as executor: 

607 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

608 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

609 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

610 server.start() 

611 

612 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

613 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

614 

615 try: 

616 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

617 yield admin_pb2_grpc.AdminStub(channel) 

618 finally: 

619 server.stop(None).wait() 

620 

621 

622@contextmanager 

623def real_editor_session(token): 

624 """ 

625 Create an Editor service for testing, using TCP sockets, uses the token for auth 

626 """ 

627 with futures.ThreadPoolExecutor(1) as executor: 

628 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

629 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

630 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

631 server.start() 

632 

633 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

634 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

635 

636 try: 

637 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

638 yield editor_pb2_grpc.EditorStub(channel) 

639 finally: 

640 server.stop(None).wait() 

641 

642 

643@contextmanager 

644def real_moderation_session(token): 

645 """ 

646 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

647 """ 

648 with futures.ThreadPoolExecutor(1) as executor: 

649 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

650 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

651 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

652 server.start() 

653 

654 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

655 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

656 

657 try: 

658 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

659 yield moderation_pb2_grpc.ModerationStub(channel) 

660 finally: 

661 server.stop(None).wait() 

662 

663 

664@contextmanager 

665def real_account_session(token: str): 

666 """ 

667 Create a Account service for testing, using TCP sockets, uses the token for auth 

668 """ 

669 with futures.ThreadPoolExecutor(1) as executor: 

670 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

671 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

672 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

673 server.start() 

674 

675 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

676 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

677 

678 try: 

679 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

680 yield account_pb2_grpc.AccountStub(channel) 

681 finally: 

682 server.stop(None).wait() 

683 

684 

685@contextmanager 

686def real_jail_session(token: str): 

687 """ 

688 Create a Jail service for testing, using TCP sockets, uses the token for auth 

689 """ 

690 with futures.ThreadPoolExecutor(1) as executor: 

691 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

692 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

693 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

694 server.start() 

695 

696 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

697 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

698 

699 try: 

700 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

701 yield jail_pb2_grpc.JailStub(channel) 

702 finally: 

703 server.stop(None).wait() 

704 

705 

706@contextmanager 

707def gis_session(token): 

708 channel = FakeChannel(token) 

709 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

710 yield gis_pb2_grpc.GISStub(channel) 

711 

712 

713@contextmanager 

714def public_session(): 

715 channel = FakeChannel() 

716 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

717 yield public_pb2_grpc.PublicStub(channel) 

718 

719 

720class FakeRpcError(grpc.RpcError): 

721 def __init__(self, code, details): 

722 self._code = code 

723 self._details = details 

724 

725 def code(self): 

726 return self._code 

727 

728 def details(self): 

729 return self._details 

730 

731 

732def _check_user_perms(method: str, user_id: int, is_jailed: bool, is_editor: bool, is_superuser: bool) -> None: 

733 # method is of the form "/org.couchers.api.core.API/GetUser" 

734 _, service_name, method_name = method.split("/") 

735 

736 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

737 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

738 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

739 assert auth_level in [ 

740 annotations_pb2.AUTH_LEVEL_OPEN, 

741 annotations_pb2.AUTH_LEVEL_JAILED, 

742 annotations_pb2.AUTH_LEVEL_SECURE, 

743 annotations_pb2.AUTH_LEVEL_EDITOR, 

744 annotations_pb2.AUTH_LEVEL_ADMIN, 

745 ] 

746 

747 if not user_id: 

748 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

749 else: 

750 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

751 "Non-superuser tried to call superuser API" 

752 ) 

753 assert not (auth_level == annotations_pb2.AUTH_LEVEL_EDITOR and not is_editor), ( 

754 "Non-editor tried to call editor API" 

755 ) 

756 assert not ( 

757 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

758 ), "User is jailed but tried to call non-open/non-jailed API" 

759 

760 

761class MockGrpcContext: 

762 """ 

763 Pure mock of grpc.ServicerContext for testing. 

764 """ 

765 

766 def __init__(self): 

767 self._initial_metadata = [] 

768 self._invocation_metadata = [] 

769 

770 def abort(self, code, details): 

771 raise FakeRpcError(code, details) 

772 

773 def invocation_metadata(self): 

774 return self._invocation_metadata 

775 

776 def send_initial_metadata(self, metadata): 

777 self._initial_metadata.extend(metadata) 

778 

779 

780class FakeChannel: 

781 """ 

782 Mock gRPC channel for testing that orchestrates context creation. 

783 

784 This holds test state (token) and creates proper CouchersContext 

785 instances when handlers are invoked. 

786 """ 

787 

788 def __init__(self, token=None): 

789 self.handlers = {} 

790 self._token = token 

791 

792 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

793 _validate_generic_rpc_handlers(generic_rpc_handlers) 

794 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

795 

796 def unary_unary(self, uri, request_serializer, response_deserializer): 

797 handler = self.handlers[uri] 

798 

799 def fake_handler(request): 

800 auth_info: UserAuthInfo | None = None 

801 if self._token: 

802 auth_info = _try_get_and_update_user_details( 

803 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

804 ) 

805 

806 _check_user_perms( 

807 uri, 

808 auth_info.user_id if auth_info else None, 

809 auth_info.is_jailed if auth_info else None, 

810 auth_info.is_editor if auth_info else None, 

811 auth_info.is_superuser if auth_info else None, 

812 ) 

813 

814 # Do a full serialization cycle on the request and the 

815 # response to catch accidental use of unserializable data. 

816 request = handler.request_deserializer(request_serializer(request)) 

817 

818 with session_scope() as session: 

819 mock_grpc_ctx = MockGrpcContext() 

820 

821 context = make_interactive_context( 

822 grpc_context=mock_grpc_ctx, 

823 user_id=auth_info.user_id if auth_info else None, 

824 is_api_key=False, 

825 token=self._token if auth_info else None, 

826 ui_language_preference=auth_info.ui_language_preference if auth_info else None, 

827 ) 

828 

829 response = handler.unary_unary(request, context, session) 

830 

831 return response_deserializer(handler.response_serializer(response)) 

832 

833 return fake_handler 

834 

835 

836@contextmanager 

837def conversations_session(token): 

838 """ 

839 Create a Conversations API for testing, uses the token for auth 

840 """ 

841 channel = FakeChannel(token) 

842 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

843 yield conversations_pb2_grpc.ConversationsStub(channel) 

844 

845 

846@contextmanager 

847def requests_session(token): 

848 """ 

849 Create a Requests API for testing, uses the token for auth 

850 """ 

851 channel = FakeChannel(token) 

852 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

853 yield requests_pb2_grpc.RequestsStub(channel) 

854 

855 

856@contextmanager 

857def threads_session(token): 

858 channel = FakeChannel(token) 

859 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

860 yield threads_pb2_grpc.ThreadsStub(channel) 

861 

862 

863@contextmanager 

864def discussions_session(token): 

865 channel = FakeChannel(token) 

866 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

867 yield discussions_pb2_grpc.DiscussionsStub(channel) 

868 

869 

870@contextmanager 

871def donations_session(token): 

872 channel = FakeChannel(token) 

873 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

874 yield donations_pb2_grpc.DonationsStub(channel) 

875 

876 

877@contextmanager 

878def real_stripe_session(): 

879 """ 

880 Create a Stripe service for testing, using TCP sockets 

881 """ 

882 with futures.ThreadPoolExecutor(1) as executor: 

883 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

884 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

885 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

886 server.start() 

887 

888 creds = grpc.local_channel_credentials() 

889 

890 try: 

891 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

892 yield stripe_pb2_grpc.StripeStub(channel) 

893 finally: 

894 server.stop(None).wait() 

895 

896 

897@contextmanager 

898def real_iris_session(): 

899 with futures.ThreadPoolExecutor(1) as executor: 

900 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

901 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

902 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

903 server.start() 

904 

905 creds = grpc.local_channel_credentials() 

906 

907 try: 

908 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

909 yield iris_pb2_grpc.IrisStub(channel) 

910 finally: 

911 server.stop(None).wait() 

912 

913 

914@contextmanager 

915def pages_session(token): 

916 channel = FakeChannel(token) 

917 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

918 yield pages_pb2_grpc.PagesStub(channel) 

919 

920 

921@contextmanager 

922def communities_session(token): 

923 channel = FakeChannel(token) 

924 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

925 yield communities_pb2_grpc.CommunitiesStub(channel) 

926 

927 

928@contextmanager 

929def groups_session(token): 

930 channel = FakeChannel(token) 

931 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

932 yield groups_pb2_grpc.GroupsStub(channel) 

933 

934 

935@contextmanager 

936def blocking_session(token): 

937 channel = FakeChannel(token) 

938 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

939 yield blocking_pb2_grpc.BlockingStub(channel) 

940 

941 

942@contextmanager 

943def notifications_session(token): 

944 channel = FakeChannel(token) 

945 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

946 yield notifications_pb2_grpc.NotificationsStub(channel) 

947 

948 

949@contextmanager 

950def account_session(token): 

951 """ 

952 Create a Account API for testing, uses the token for auth 

953 """ 

954 channel = FakeChannel(token) 

955 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

956 yield account_pb2_grpc.AccountStub(channel) 

957 

958 

959@contextmanager 

960def search_session(token): 

961 """ 

962 Create a Search API for testing, uses the token for auth 

963 """ 

964 channel = FakeChannel(token) 

965 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

966 yield search_pb2_grpc.SearchStub(channel) 

967 

968 

969@contextmanager 

970def references_session(token): 

971 """ 

972 Create a References API for testing, uses the token for auth 

973 """ 

974 channel = FakeChannel(token) 

975 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

976 yield references_pb2_grpc.ReferencesStub(channel) 

977 

978 

979@contextmanager 

980def galleries_session(token): 

981 """ 

982 Create a Galleries API for testing, uses the token for auth 

983 """ 

984 channel = FakeChannel(token) 

985 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

986 yield galleries_pb2_grpc.GalleriesStub(channel) 

987 

988 

989@contextmanager 

990def reporting_session(token): 

991 channel = FakeChannel(token) 

992 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

993 yield reporting_pb2_grpc.ReportingStub(channel) 

994 

995 

996@contextmanager 

997def events_session(token): 

998 channel = FakeChannel(token) 

999 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

1000 yield events_pb2_grpc.EventsStub(channel) 

1001 

1002 

1003@contextmanager 

1004def postal_verification_session(token): 

1005 channel = FakeChannel(token) 

1006 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

1007 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

1008 

1009 

1010@contextmanager 

1011def bugs_session(token=None): 

1012 channel = FakeChannel(token) 

1013 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

1014 yield bugs_pb2_grpc.BugsStub(channel) 

1015 

1016 

1017@contextmanager 

1018def resources_session(): 

1019 channel = FakeChannel() 

1020 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

1021 yield resources_pb2_grpc.ResourcesStub(channel) 

1022 

1023 

1024@contextmanager 

1025def media_session(bearer_token): 

1026 """ 

1027 Create a fresh Media API for testing, uses the bearer token for media auth 

1028 """ 

1029 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

1030 

1031 with futures.ThreadPoolExecutor(1) as executor: 

1032 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

1033 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

1034 servicer = Media() 

1035 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

1036 server.start() 

1037 

1038 call_creds = grpc.access_token_call_credentials(bearer_token) 

1039 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

1040 

1041 try: 

1042 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

1043 yield media_pb2_grpc.MediaStub(channel) 

1044 finally: 

1045 server.stop(None).wait() 

1046 

1047 

1048@pytest.fixture(scope="class") 

1049def testconfig(): 

1050 prevconfig = config.copy() 

1051 config.clear() 

1052 config.update(prevconfig) 

1053 

1054 config["IN_TEST"] = True 

1055 

1056 config["DEV"] = True 

1057 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

1058 config["VERSION"] = "testing_version" 

1059 config["BASE_URL"] = "http://localhost:3000" 

1060 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

1061 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

1062 config["COOKIE_DOMAIN"] = "localhost" 

1063 

1064 config["ENABLE_SMS"] = False 

1065 config["SMS_SENDER_ID"] = "invalid" 

1066 

1067 config["ENABLE_EMAIL"] = False 

1068 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

1069 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

1070 config["NOTIFICATION_PREFIX"] = "[TEST] " 

1071 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

1072 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

1073 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

1074 

1075 config["ENABLE_DONATIONS"] = False 

1076 config["STRIPE_API_KEY"] = "" 

1077 config["STRIPE_WEBHOOK_SECRET"] = "" 

1078 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

1079 

1080 config["ENABLE_STRONG_VERIFICATION"] = False 

1081 config["IRIS_ID_PUBKEY"] = "" 

1082 config["IRIS_ID_SECRET"] = "" 

1083 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

1084 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

1085 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

1086 ) 

1087 

1088 config["ENABLE_POSTAL_VERIFICATION"] = False 

1089 

1090 config["SMTP_HOST"] = "localhost" 

1091 config["SMTP_PORT"] = 587 

1092 config["SMTP_USERNAME"] = "username" 

1093 config["SMTP_PASSWORD"] = "password" 

1094 

1095 config["ENABLE_MEDIA"] = True 

1096 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

1097 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

1098 ) 

1099 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

1100 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

1101 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

1102 

1103 config["BUG_TOOL_ENABLED"] = False 

1104 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

1105 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

1106 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

1107 

1108 config["LISTMONK_ENABLED"] = False 

1109 config["LISTMONK_BASE_URL"] = "https://localhost" 

1110 config["LISTMONK_API_USERNAME"] = "..." 

1111 config["LISTMONK_API_KEY"] = "..." 

1112 config["LISTMONK_LIST_ID"] = 3 

1113 

1114 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

1115 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

1116 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

1117 

1118 config["ACTIVENESS_PROBES_ENABLED"] = True 

1119 

1120 config["RECAPTHCA_ENABLED"] = False 

1121 config["RECAPTHCA_PROJECT_ID"] = "..." 

1122 config["RECAPTHCA_API_KEY"] = "..." 

1123 config["RECAPTHCA_SITE_KEY"] = "..." 

1124 

1125 config["EXPERIMENTATION_ENABLED"] = False 

1126 config["EXPERIMENTATION_PASS_ALL_GATES"] = True 

1127 config["STATSIG_SERVER_SECRET_KEY"] = "" 

1128 config["STATSIG_ENVIRONMENT"] = "testing" 

1129 

1130 # Moderation auto-approval deadline - 0 disables, set in tests that need it 

1131 config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] = 0 

1132 # Bot user ID for automated moderation - will be set to a real user in tests that need it 

1133 config["MODERATION_BOT_USER_ID"] = 1 

1134 

1135 # Dev APIs disabled by default in tests 

1136 config["ENABLE_DEV_APIS"] = False 

1137 

1138 yield None 

1139 

1140 config.clear() 

1141 config.update(prevconfig) 

1142 

1143 

1144def run_migration_test(): 

1145 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true" 

1146 

1147 

1148@pytest.fixture 

1149def fast_passwords(): 

1150 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

1151 # make this fast by removing the hashing step 

1152 

1153 def fast_hash(password: bytes) -> bytes: 

1154 return b"fake hash:" + password 

1155 

1156 def fast_verify(hashed: bytes, password: bytes) -> bool: 

1157 return hashed == fast_hash(password) 

1158 

1159 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

1160 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

1161 yield 

1162 

1163 

1164def process_jobs(): 

1165 while process_job(): 

1166 pass 

1167 

1168 

1169@contextmanager 

1170def mock_notification_email(): 

1171 with patch("couchers.email._queue_email") as mock: 

1172 yield mock 

1173 process_jobs() 

1174 

1175 

1176@dataclass 

1177class EmailData: 

1178 sender_name: str 

1179 sender_email: str 

1180 recipient: str 

1181 subject: str 

1182 plain: str 

1183 html: str 

1184 source_data: str 

1185 list_unsubscribe_header: str 

1186 

1187 

1188def email_fields(mock, call_ix=0): 

1189 _, kw = mock.call_args_list[call_ix] 

1190 return EmailData( 

1191 sender_name=kw.get("sender_name"), 

1192 sender_email=kw.get("sender_email"), 

1193 recipient=kw.get("recipient"), 

1194 subject=kw.get("subject"), 

1195 plain=kw.get("plain"), 

1196 html=kw.get("html"), 

1197 source_data=kw.get("source_data"), 

1198 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

1199 ) 

1200 

1201 

1202class Push: 

1203 """ 

1204 This allows nice access to the push info via e.g. push.title instead of push["title"] 

1205 """ 

1206 

1207 def __init__(self, kwargs): 

1208 self.kwargs = kwargs 

1209 

1210 def __getattr__(self, attr): 

1211 try: 

1212 return self.kwargs[attr] 

1213 except KeyError: 

1214 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

1215 

1216 def __repr__(self): 

1217 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

1218 return f"Push({kwargs_disp})" 

1219 

1220 

1221class PushCollector: 

1222 def __init__(self): 

1223 # pairs of (user_id, push) 

1224 self.pushes = [] 

1225 

1226 def by_user(self, user_id): 

1227 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

1228 

1229 def push_to_user(self, session, user_id, **kwargs): 

1230 self.pushes.append((user_id, Push(kwargs=kwargs))) 

1231 

1232 def assert_user_has_count(self, user_id, count): 

1233 assert len(self.by_user(user_id)) == count 

1234 

1235 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1236 push = self.by_user(user_id)[ix] 

1237 for kwarg in kwargs: 

1238 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1239 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1240 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1241 ) 

1242 

1243 def assert_user_has_single_matching(self, user_id, **kwargs): 

1244 self.assert_user_has_count(user_id, 1) 

1245 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1246 

1247 

1248@pytest.fixture 

1249def push_collector(): 

1250 """ 

1251 See test_SendTestPushNotification for an example on how to use this fixture 

1252 """ 

1253 collector = PushCollector() 

1254 

1255 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1256 yield collector 

1257 

1258 

1259class Moderator: 

1260 """ 

1261 A test fixture that provides a moderator user and methods to exercise the moderation API. 

1262 

1263 Usage: 

1264 def test_example(db, moderator): 

1265 user, token = generate_user() 

1266 # ... create a host request ... 

1267 moderator.approve_host_request(host_request_id) 

1268 """ 

1269 

1270 def __init__(self, user: User, token: str): 

1271 self.user = user 

1272 self.token = token 

1273 

1274 def approve_host_request(self, host_request_id: int, reason: str = "Test approval") -> None: 

1275 """ 

1276 Approve a host request using the moderation API. 

1277 

1278 Args: 

1279 host_request_id: The conversation_id of the host request 

1280 reason: Optional reason for approval 

1281 """ 

1282 with real_moderation_session(self.token) as api: 

1283 state_res = api.GetModerationState( 

1284 moderation_pb2.GetModerationStateReq( 

1285 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

1286 object_id=host_request_id, 

1287 ) 

1288 ) 

1289 api.ModerateContent( 

1290 moderation_pb2.ModerateContentReq( 

1291 moderation_state_id=state_res.moderation_state.moderation_state_id, 

1292 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1293 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1294 reason=reason, 

1295 ) 

1296 ) 

1297 

1298 def approve_group_chat(self, group_chat_id: int, reason: str = "Test approval") -> None: 

1299 """ 

1300 Approve a group chat using the moderation API. 

1301 

1302 Args: 

1303 group_chat_id: The conversation_id of the group chat 

1304 reason: Optional reason for approval 

1305 """ 

1306 with real_moderation_session(self.token) as api: 

1307 state_res = api.GetModerationState( 

1308 moderation_pb2.GetModerationStateReq( 

1309 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

1310 object_id=group_chat_id, 

1311 ) 

1312 ) 

1313 api.ModerateContent( 

1314 moderation_pb2.ModerateContentReq( 

1315 moderation_state_id=state_res.moderation_state.moderation_state_id, 

1316 action=moderation_pb2.MODERATION_ACTION_APPROVE, 

1317 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

1318 reason=reason, 

1319 ) 

1320 ) 

1321 

1322 

1323@pytest.fixture 

1324def moderator(): 

1325 """ 

1326 Creates a moderator (superuser) and provides methods to exercise the moderation API. 

1327 

1328 Usage: 

1329 def test_example(db, moderator): 

1330 # ... create a host request ... 

1331 moderator.approve_host_request(host_request_id) 

1332 """ 

1333 user, token = generate_user(is_superuser=True) 

1334 yield Moderator(user, token)