Coverage for src/tests/test_fixtures.py: 99%

538 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-09-14 15:31 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date, timedelta 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.orm import close_all_sessions 

12from sqlalchemy.sql import or_, text 

13 

14from couchers.config import config 

15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

16from couchers.crypto import random_hex 

17from couchers.db import _get_base_engine, session_scope 

18from couchers.descriptor_pool import get_descriptor_pool 

19from couchers.interceptors import ( 

20 CouchersMiddlewareInterceptor, 

21 _try_get_and_update_user_details, 

22) 

23from couchers.jobs.worker import process_job 

24from couchers.models import ( 

25 Base, 

26 FriendRelationship, 

27 FriendStatus, 

28 HostingStatus, 

29 Language, 

30 LanguageAbility, 

31 LanguageFluency, 

32 MeetupStatus, 

33 ModerationUserList, 

34 PassportSex, 

35 Region, 

36 RegionLived, 

37 RegionVisited, 

38 StrongVerificationAttempt, 

39 StrongVerificationAttemptStatus, 

40 Upload, 

41 User, 

42 UserBlock, 

43 UserSession, 

44) 

45from couchers.servicers.account import Account, Iris 

46from couchers.servicers.admin import Admin 

47from couchers.servicers.api import API 

48from couchers.servicers.auth import Auth, create_session 

49from couchers.servicers.blocking import Blocking 

50from couchers.servicers.bugs import Bugs 

51from couchers.servicers.communities import Communities 

52from couchers.servicers.conversations import Conversations 

53from couchers.servicers.discussions import Discussions 

54from couchers.servicers.donations import Donations, Stripe 

55from couchers.servicers.events import Events 

56from couchers.servicers.gis import GIS 

57from couchers.servicers.groups import Groups 

58from couchers.servicers.jail import Jail 

59from couchers.servicers.media import Media, get_media_auth_interceptor 

60from couchers.servicers.notifications import Notifications 

61from couchers.servicers.pages import Pages 

62from couchers.servicers.public import Public 

63from couchers.servicers.references import References 

64from couchers.servicers.reporting import Reporting 

65from couchers.servicers.requests import Requests 

66from couchers.servicers.resources import Resources 

67from couchers.servicers.search import Search 

68from couchers.servicers.threads import Threads 

69from couchers.sql import couchers_select as select 

70from couchers.utils import create_coordinate, now 

71from proto import ( 

72 account_pb2_grpc, 

73 admin_pb2_grpc, 

74 annotations_pb2, 

75 api_pb2_grpc, 

76 auth_pb2_grpc, 

77 blocking_pb2_grpc, 

78 bugs_pb2_grpc, 

79 communities_pb2_grpc, 

80 conversations_pb2_grpc, 

81 discussions_pb2_grpc, 

82 donations_pb2_grpc, 

83 events_pb2_grpc, 

84 gis_pb2_grpc, 

85 groups_pb2_grpc, 

86 iris_pb2_grpc, 

87 jail_pb2_grpc, 

88 media_pb2_grpc, 

89 notifications_pb2_grpc, 

90 pages_pb2_grpc, 

91 public_pb2_grpc, 

92 references_pb2_grpc, 

93 reporting_pb2_grpc, 

94 requests_pb2_grpc, 

95 resources_pb2_grpc, 

96 search_pb2_grpc, 

97 stripe_pb2_grpc, 

98 threads_pb2_grpc, 

99) 

100 

101 

102def drop_all(): 

103 """drop everything currently in the database""" 

104 with session_scope() as session: 

105 # postgis is required for all the Geographic Information System (GIS) stuff 

106 # pg_trgm is required for trigram based search 

107 # btree_gist is required for gist-based exclusion constraints 

108 session.execute( 

109 text( 

110 "DROP SCHEMA IF EXISTS public CASCADE;" 

111 "DROP SCHEMA IF EXISTS logging CASCADE;" 

112 "DROP EXTENSION IF EXISTS postgis CASCADE;" 

113 "CREATE SCHEMA public;" 

114 "CREATE SCHEMA logging;" 

115 "CREATE EXTENSION postgis;" 

116 "CREATE EXTENSION pg_trgm;" 

117 "CREATE EXTENSION btree_gist;" 

118 ) 

119 ) 

120 

121 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise 

122 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585" 

123 # and similar errors 

124 _get_base_engine().dispose() 

125 

126 close_all_sessions() 

127 

128 

129def create_schema_from_models(): 

130 """ 

131 Create everything from the current models, not incrementally 

132 through migrations. 

133 """ 

134 

135 # create the slugify function 

136 functions = Path(__file__).parent / "slugify.sql" 

137 with open(functions) as f, session_scope() as session: 

138 session.execute(text(f.read())) 

139 

140 Base.metadata.create_all(_get_base_engine()) 

141 

142 

143def populate_testing_resources(session): 

144 """ 

145 Testing version of couchers.resources.copy_resources_to_database 

146 """ 

147 regions = [ 

148 ("AUS", "Australia"), 

149 ("CAN", "Canada"), 

150 ("CHE", "Switzerland"), 

151 ("CUB", "Cuba"), 

152 ("CXR", "Christmas Island"), 

153 ("CZE", "Czechia"), 

154 ("DEU", "Germany"), 

155 ("EGY", "Egypt"), 

156 ("ESP", "Spain"), 

157 ("EST", "Estonia"), 

158 ("FIN", "Finland"), 

159 ("FRA", "France"), 

160 ("GBR", "United Kingdom"), 

161 ("GEO", "Georgia"), 

162 ("GHA", "Ghana"), 

163 ("GRC", "Greece"), 

164 ("HKG", "Hong Kong"), 

165 ("IRL", "Ireland"), 

166 ("ISR", "Israel"), 

167 ("ITA", "Italy"), 

168 ("JPN", "Japan"), 

169 ("LAO", "Laos"), 

170 ("MEX", "Mexico"), 

171 ("MMR", "Myanmar"), 

172 ("NAM", "Namibia"), 

173 ("NLD", "Netherlands"), 

174 ("NZL", "New Zealand"), 

175 ("POL", "Poland"), 

176 ("PRK", "North Korea"), 

177 ("REU", "Réunion"), 

178 ("SGP", "Singapore"), 

179 ("SWE", "Sweden"), 

180 ("THA", "Thailand"), 

181 ("TUR", "Turkey"), 

182 ("TWN", "Taiwan"), 

183 ("USA", "United States"), 

184 ("VNM", "Vietnam"), 

185 ] 

186 

187 languages = [ 

188 ("arb", "Arabic (Standard)"), 

189 ("deu", "German"), 

190 ("eng", "English"), 

191 ("fin", "Finnish"), 

192 ("fra", "French"), 

193 ("heb", "Hebrew"), 

194 ("hun", "Hungarian"), 

195 ("jpn", "Japanese"), 

196 ("pol", "Polish"), 

197 ("swe", "Swedish"), 

198 ("cmn", "Chinese (Mandarin)"), 

199 ] 

200 

201 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

202 tz_sql = f.read() 

203 

204 for code, name in regions: 

205 session.add(Region(code=code, name=name)) 

206 

207 for code, name in languages: 

208 session.add(Language(code=code, name=name)) 

209 

210 session.execute(text(tz_sql)) 

211 

212 

213def recreate_database(): 

214 """ 

215 Connect to a running Postgres database, build it using metadata.create_all() 

216 """ 

217 

218 # running in non-UTC catches some timezone errors 

219 os.environ["TZ"] = "America/New_York" 

220 

221 # drop everything currently in the database 

222 drop_all() 

223 

224 # create everything from the current models, not incrementally through migrations 

225 create_schema_from_models() 

226 

227 with session_scope() as session: 

228 populate_testing_resources(session) 

229 

230 

231@pytest.fixture() 

232def db(): 

233 """ 

234 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

235 """ 

236 

237 recreate_database() 

238 

239 

240def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs): 

241 """ 

242 Create a new user, return session token 

243 

244 The user is detached from any session, and you can access its static attributes, but you can't modify it 

245 

246 Use this most of the time 

247 """ 

248 auth = Auth() 

249 

250 with session_scope() as session: 

251 # default args 

252 username = "test_user_" + random_hex(16) 

253 user_opts = { 

254 "username": username, 

255 "email": f"{username}@dev.couchers.org", 

256 # password is just 'password' 

257 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

258 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

259 "name": username.capitalize(), 

260 "hosting_status": HostingStatus.cant_host, 

261 "meetup_status": MeetupStatus.open_to_meetup, 

262 "city": "Testing city", 

263 "hometown": "Test hometown", 

264 "community_standing": 0.5, 

265 "birthdate": date(year=2000, month=1, day=1), 

266 "gender": "Woman", 

267 "pronouns": "", 

268 "occupation": "Tester", 

269 "education": "UST(esting)", 

270 "about_me": "I test things", 

271 "things_i_like": "Code", 

272 "about_place": "My place has a lot of testing paraphenelia", 

273 "additional_information": "I can be a bit testy", 

274 # you need to make sure to update this logic to make sure the user is jailed/not on request 

275 "accepted_tos": TOS_VERSION, 

276 "accepted_community_guidelines": GUIDELINES_VERSION, 

277 "geom": create_coordinate(40.7108, -73.9740), 

278 "geom_radius": 100, 

279 "onboarding_emails_sent": 1, 

280 "last_onboarding_email_sent": now(), 

281 "has_donated": True, 

282 } 

283 

284 for key, value in kwargs.items(): 

285 user_opts[key] = value 

286 

287 user = User(**user_opts) 

288 session.add(user) 

289 session.flush() 

290 

291 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

292 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

293 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

294 

295 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

296 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

297 session.add(RegionLived(user_id=user.id, region_code="EST")) 

298 

299 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

300 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

301 

302 # this expires the user, so now it's "dirty" 

303 session.commit() 

304 

305 class _MockCouchersContext: 

306 @property 

307 def headers(self): 

308 return {} 

309 

310 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False) 

311 

312 # deleted user aborts session creation, hence this follows and necessitates a second commit 

313 if delete_user: 

314 user.is_deleted = True 

315 

316 user.recommendation_score = 1e10 - user.id 

317 

318 if complete_profile: 

319 key = random_hex(32) 

320 filename = random_hex(32) + ".jpg" 

321 session.add( 

322 Upload( 

323 key=key, 

324 filename=filename, 

325 creator_user_id=user.id, 

326 ) 

327 ) 

328 session.flush() 

329 user.avatar_key = key 

330 user.about_me = "I have a complete profile!\n" * 20 

331 

332 if strong_verification: 

333 attempt = StrongVerificationAttempt( 

334 verification_attempt_token=f"verification_attempt_token_{user.id}", 

335 user_id=user.id, 

336 status=StrongVerificationAttemptStatus.succeeded, 

337 has_full_data=True, 

338 passport_encrypted_data=b"not real", 

339 passport_date_of_birth=user.birthdate, 

340 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get( 

341 user.gender, PassportSex.unspecified 

342 ), 

343 has_minimal_data=True, 

344 passport_expiry_date=date.today() + timedelta(days=10), 

345 passport_nationality="UTO", 

346 passport_last_three_document_chars=f"{user.id:03}", 

347 iris_token=f"iris_token_{user.id}", 

348 iris_session_id=user.id, 

349 ) 

350 session.add(attempt) 

351 session.flush() 

352 assert attempt.has_strong_verification(user) 

353 

354 session.commit() 

355 

356 assert user.has_completed_profile == complete_profile 

357 

358 # refresh it, undoes the expiry 

359 session.refresh(user) 

360 

361 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

362 user.timezone # noqa: B018 

363 

364 # allows detaches the user from the session, allowing its use outside this session 

365 session.expunge(user) 

366 

367 return user, token 

368 

369 

370def get_user_id_and_token(session, username): 

371 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

372 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

373 return user_id, token 

374 

375 

376def make_friends(user1, user2): 

377 with session_scope() as session: 

378 friend_relationship = FriendRelationship( 

379 from_user_id=user1.id, 

380 to_user_id=user2.id, 

381 status=FriendStatus.accepted, 

382 ) 

383 session.add(friend_relationship) 

384 

385 

386def make_user_block(user1, user2): 

387 with session_scope() as session: 

388 user_block = UserBlock( 

389 blocking_user_id=user1.id, 

390 blocked_user_id=user2.id, 

391 ) 

392 session.add(user_block) 

393 session.commit() 

394 

395 

396def make_user_invisible(user_id): 

397 with session_scope() as session: 

398 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

399 

400 

401# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

402def get_friend_relationship(user1, user2): 

403 with session_scope() as session: 

404 friend_relationship = session.execute( 

405 select(FriendRelationship).where( 

406 or_( 

407 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

408 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

409 ) 

410 ) 

411 ).scalar_one_or_none() 

412 

413 session.expunge(friend_relationship) 

414 return friend_relationship 

415 

416 

417def add_users_to_new_moderation_list(users): 

418 """Group users as duplicated accounts""" 

419 with session_scope() as session: 

420 moderation_user_list = ModerationUserList() 

421 session.add(moderation_user_list) 

422 session.flush() 

423 for user in users: 

424 refreshed_user = session.get(User, user.id) 

425 moderation_user_list.users.append(refreshed_user) 

426 return moderation_user_list.id 

427 

428 

429class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

430 """ 

431 Injects the right `cookie: couchers-sesh=...` header into the metadata 

432 """ 

433 

434 def __init__(self, token): 

435 self.token = token 

436 

437 def __call__(self, context, callback): 

438 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

439 

440 

441@contextmanager 

442def auth_api_session(grpc_channel_options=()): 

443 """ 

444 Create an Auth API for testing 

445 

446 This needs to use the real server since it plays around with headers 

447 """ 

448 with futures.ThreadPoolExecutor(1) as executor: 

449 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

450 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

451 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

452 server.start() 

453 

454 try: 

455 with grpc.secure_channel( 

456 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options 

457 ) as channel: 

458 

459 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

460 def __init__(self): 

461 self.latest_headers = {} 

462 

463 def intercept_unary_unary(self, continuation, client_call_details, request): 

464 call = continuation(client_call_details, request) 

465 self.latest_headers = dict(call.initial_metadata()) 

466 self.latest_header_raw = call.initial_metadata() 

467 return call 

468 

469 metadata_interceptor = _MetadataKeeperInterceptor() 

470 channel = grpc.intercept_channel(channel, metadata_interceptor) 

471 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

472 finally: 

473 server.stop(None).wait() 

474 

475 

476@contextmanager 

477def api_session(token): 

478 """ 

479 Create an API for testing, uses the token for auth 

480 """ 

481 channel = fake_channel(token) 

482 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

483 yield api_pb2_grpc.APIStub(channel) 

484 

485 

486@contextmanager 

487def real_api_session(token): 

488 """ 

489 Create an API for testing, using TCP sockets, uses the token for auth 

490 """ 

491 with futures.ThreadPoolExecutor(1) as executor: 

492 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

493 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

494 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

495 server.start() 

496 

497 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

498 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

499 

500 try: 

501 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

502 yield api_pb2_grpc.APIStub(channel) 

503 finally: 

504 server.stop(None).wait() 

505 

506 

507@contextmanager 

508def real_admin_session(token): 

509 """ 

510 Create a Admin service for testing, using TCP sockets, uses the token for auth 

511 """ 

512 with futures.ThreadPoolExecutor(1) as executor: 

513 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

514 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

515 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

516 server.start() 

517 

518 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

519 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

520 

521 try: 

522 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

523 yield admin_pb2_grpc.AdminStub(channel) 

524 finally: 

525 server.stop(None).wait() 

526 

527 

528@contextmanager 

529def real_account_session(token): 

530 """ 

531 Create a Account service for testing, using TCP sockets, uses the token for auth 

532 """ 

533 with futures.ThreadPoolExecutor(1) as executor: 

534 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

535 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

536 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

537 server.start() 

538 

539 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

540 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

541 

542 try: 

543 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

544 yield account_pb2_grpc.AccountStub(channel) 

545 finally: 

546 server.stop(None).wait() 

547 

548 

549@contextmanager 

550def real_jail_session(token): 

551 """ 

552 Create a Jail service for testing, using TCP sockets, uses the token for auth 

553 """ 

554 with futures.ThreadPoolExecutor(1) as executor: 

555 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

556 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

557 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

558 server.start() 

559 

560 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

561 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

562 

563 try: 

564 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

565 yield jail_pb2_grpc.JailStub(channel) 

566 finally: 

567 server.stop(None).wait() 

568 

569 

570@contextmanager 

571def gis_session(token): 

572 channel = fake_channel(token) 

573 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

574 yield gis_pb2_grpc.GISStub(channel) 

575 

576 

577@contextmanager 

578def public_session(): 

579 channel = fake_channel() 

580 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

581 yield public_pb2_grpc.PublicStub(channel) 

582 

583 

584class FakeRpcError(grpc.RpcError): 

585 def __init__(self, code, details): 

586 self._code = code 

587 self._details = details 

588 

589 def code(self): 

590 return self._code 

591 

592 def details(self): 

593 return self._details 

594 

595 

596def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry): 

597 # method is of the form "/org.couchers.api.core.API/GetUser" 

598 _, service_name, method_name = method.split("/") 

599 

600 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

601 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

602 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

603 assert auth_level in [ 

604 annotations_pb2.AUTH_LEVEL_OPEN, 

605 annotations_pb2.AUTH_LEVEL_JAILED, 

606 annotations_pb2.AUTH_LEVEL_SECURE, 

607 annotations_pb2.AUTH_LEVEL_ADMIN, 

608 ] 

609 

610 if not user_id: 

611 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

612 else: 

613 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), ( 

614 "Non-superuser tried to call superuser API" 

615 ) 

616 assert not ( 

617 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

618 ), "User is jailed but tried to call non-open/non-jailed API" 

619 

620 

621class FakeChannel: 

622 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None): 

623 self.handlers = {} 

624 self.user_id = user_id 

625 self._is_jailed = is_jailed 

626 self._is_superuser = is_superuser 

627 self._token_expiry = token_expiry 

628 

629 def is_logged_in(self): 

630 return self.user_id is not None 

631 

632 def abort(self, code, details): 

633 raise FakeRpcError(code, details) 

634 

635 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

636 from grpc._server import _validate_generic_rpc_handlers 

637 

638 _validate_generic_rpc_handlers(generic_rpc_handlers) 

639 

640 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

641 

642 def unary_unary(self, uri, request_serializer, response_deserializer): 

643 handler = self.handlers[uri] 

644 

645 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry) 

646 

647 def fake_handler(request): 

648 # Do a full serialization cycle on the request and the 

649 # response to catch accidental use of unserializable data. 

650 request = handler.request_deserializer(request_serializer(request)) 

651 

652 with session_scope() as session: 

653 response = handler.unary_unary(request, self, session) 

654 

655 return response_deserializer(handler.response_serializer(response)) 

656 

657 return fake_handler 

658 

659 

660def fake_channel(token=None): 

661 if token: 

662 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details( 

663 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

664 ) 

665 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry) 

666 return FakeChannel() 

667 

668 

669@contextmanager 

670def conversations_session(token): 

671 """ 

672 Create a Conversations API for testing, uses the token for auth 

673 """ 

674 channel = fake_channel(token) 

675 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

676 yield conversations_pb2_grpc.ConversationsStub(channel) 

677 

678 

679@contextmanager 

680def requests_session(token): 

681 """ 

682 Create a Requests API for testing, uses the token for auth 

683 """ 

684 channel = fake_channel(token) 

685 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

686 yield requests_pb2_grpc.RequestsStub(channel) 

687 

688 

689@contextmanager 

690def threads_session(token): 

691 channel = fake_channel(token) 

692 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

693 yield threads_pb2_grpc.ThreadsStub(channel) 

694 

695 

696@contextmanager 

697def discussions_session(token): 

698 channel = fake_channel(token) 

699 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

700 yield discussions_pb2_grpc.DiscussionsStub(channel) 

701 

702 

703@contextmanager 

704def donations_session(token): 

705 channel = fake_channel(token) 

706 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

707 yield donations_pb2_grpc.DonationsStub(channel) 

708 

709 

710@contextmanager 

711def real_stripe_session(): 

712 """ 

713 Create a Stripe service for testing, using TCP sockets 

714 """ 

715 with futures.ThreadPoolExecutor(1) as executor: 

716 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

717 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

718 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

719 server.start() 

720 

721 creds = grpc.local_channel_credentials() 

722 

723 try: 

724 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

725 yield stripe_pb2_grpc.StripeStub(channel) 

726 finally: 

727 server.stop(None).wait() 

728 

729 

730@contextmanager 

731def real_iris_session(): 

732 with futures.ThreadPoolExecutor(1) as executor: 

733 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

734 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

735 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

736 server.start() 

737 

738 creds = grpc.local_channel_credentials() 

739 

740 try: 

741 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

742 yield iris_pb2_grpc.IrisStub(channel) 

743 finally: 

744 server.stop(None).wait() 

745 

746 

747@contextmanager 

748def pages_session(token): 

749 channel = fake_channel(token) 

750 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

751 yield pages_pb2_grpc.PagesStub(channel) 

752 

753 

754@contextmanager 

755def communities_session(token): 

756 channel = fake_channel(token) 

757 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

758 yield communities_pb2_grpc.CommunitiesStub(channel) 

759 

760 

761@contextmanager 

762def groups_session(token): 

763 channel = fake_channel(token) 

764 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

765 yield groups_pb2_grpc.GroupsStub(channel) 

766 

767 

768@contextmanager 

769def blocking_session(token): 

770 channel = fake_channel(token) 

771 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

772 yield blocking_pb2_grpc.BlockingStub(channel) 

773 

774 

775@contextmanager 

776def notifications_session(token): 

777 channel = fake_channel(token) 

778 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

779 yield notifications_pb2_grpc.NotificationsStub(channel) 

780 

781 

782@contextmanager 

783def account_session(token): 

784 """ 

785 Create a Account API for testing, uses the token for auth 

786 """ 

787 channel = fake_channel(token) 

788 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

789 yield account_pb2_grpc.AccountStub(channel) 

790 

791 

792@contextmanager 

793def search_session(token): 

794 """ 

795 Create a Search API for testing, uses the token for auth 

796 """ 

797 channel = fake_channel(token) 

798 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

799 yield search_pb2_grpc.SearchStub(channel) 

800 

801 

802@contextmanager 

803def references_session(token): 

804 """ 

805 Create a References API for testing, uses the token for auth 

806 """ 

807 channel = fake_channel(token) 

808 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

809 yield references_pb2_grpc.ReferencesStub(channel) 

810 

811 

812@contextmanager 

813def reporting_session(token): 

814 channel = fake_channel(token) 

815 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

816 yield reporting_pb2_grpc.ReportingStub(channel) 

817 

818 

819@contextmanager 

820def events_session(token): 

821 channel = fake_channel(token) 

822 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

823 yield events_pb2_grpc.EventsStub(channel) 

824 

825 

826@contextmanager 

827def bugs_session(token=None): 

828 channel = fake_channel(token) 

829 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

830 yield bugs_pb2_grpc.BugsStub(channel) 

831 

832 

833@contextmanager 

834def resources_session(): 

835 channel = fake_channel() 

836 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

837 yield resources_pb2_grpc.ResourcesStub(channel) 

838 

839 

840@contextmanager 

841def media_session(bearer_token): 

842 """ 

843 Create a fresh Media API for testing, uses the bearer token for media auth 

844 """ 

845 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

846 

847 with futures.ThreadPoolExecutor(1) as executor: 

848 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

849 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

850 servicer = Media() 

851 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

852 server.start() 

853 

854 call_creds = grpc.access_token_call_credentials(bearer_token) 

855 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

856 

857 try: 

858 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

859 yield media_pb2_grpc.MediaStub(channel) 

860 finally: 

861 server.stop(None).wait() 

862 

863 

864@pytest.fixture(scope="class") 

865def testconfig(): 

866 prevconfig = config.copy() 

867 config.clear() 

868 config.update(prevconfig) 

869 

870 config["IN_TEST"] = True 

871 

872 config["DEV"] = True 

873 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

874 config["VERSION"] = "testing_version" 

875 config["BASE_URL"] = "http://localhost:3000" 

876 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

877 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

878 config["COOKIE_DOMAIN"] = "localhost" 

879 

880 config["ENABLE_SMS"] = False 

881 config["SMS_SENDER_ID"] = "invalid" 

882 

883 config["ENABLE_EMAIL"] = False 

884 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

885 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

886 config["NOTIFICATION_PREFIX"] = "[TEST] " 

887 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

888 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

889 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

890 

891 config["ENABLE_DONATIONS"] = False 

892 config["STRIPE_API_KEY"] = "" 

893 config["STRIPE_WEBHOOK_SECRET"] = "" 

894 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

895 

896 config["ENABLE_STRONG_VERIFICATION"] = False 

897 config["IRIS_ID_PUBKEY"] = "" 

898 config["IRIS_ID_SECRET"] = "" 

899 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

900 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

901 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

902 ) 

903 

904 config["SMTP_HOST"] = "localhost" 

905 config["SMTP_PORT"] = 587 

906 config["SMTP_USERNAME"] = "username" 

907 config["SMTP_PASSWORD"] = "password" 

908 

909 config["ENABLE_MEDIA"] = True 

910 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

911 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

912 ) 

913 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

914 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001" 

915 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001" 

916 

917 config["BUG_TOOL_ENABLED"] = False 

918 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

919 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

920 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

921 

922 config["LISTMONK_ENABLED"] = False 

923 config["LISTMONK_BASE_URL"] = "https://localhost" 

924 config["LISTMONK_API_USERNAME"] = "..." 

925 config["LISTMONK_API_KEY"] = "..." 

926 config["LISTMONK_LIST_ID"] = 3 

927 

928 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

929 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

930 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

931 

932 config["ACTIVENESS_PROBES_ENABLED"] = True 

933 

934 config["RECAPTHCA_ENABLED"] = False 

935 config["RECAPTHCA_PROJECT_ID"] = "..." 

936 config["RECAPTHCA_API_KEY"] = "..." 

937 config["RECAPTHCA_SITE_KEY"] = "..." 

938 

939 yield None 

940 

941 config.clear() 

942 config.update(prevconfig) 

943 

944 

945def run_migration_test(): 

946 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true" 

947 

948 

949@pytest.fixture 

950def fast_passwords(): 

951 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

952 # make this fast by removing the hashing step 

953 

954 def fast_hash(password: bytes) -> bytes: 

955 return b"fake hash:" + password 

956 

957 def fast_verify(hashed: bytes, password: bytes) -> bool: 

958 return hashed == fast_hash(password) 

959 

960 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

961 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

962 yield 

963 

964 

965def process_jobs(): 

966 while process_job(): 

967 pass 

968 

969 

970@contextmanager 

971def mock_notification_email(): 

972 with patch("couchers.email._queue_email") as mock: 

973 yield mock 

974 process_jobs() 

975 

976 

977@dataclass 

978class EmailData: 

979 sender_name: str 

980 sender_email: str 

981 recipient: str 

982 subject: str 

983 plain: str 

984 html: str 

985 source_data: str 

986 list_unsubscribe_header: str 

987 

988 

989def email_fields(mock, call_ix=0): 

990 _, kw = mock.call_args_list[call_ix] 

991 return EmailData( 

992 sender_name=kw.get("sender_name"), 

993 sender_email=kw.get("sender_email"), 

994 recipient=kw.get("recipient"), 

995 subject=kw.get("subject"), 

996 plain=kw.get("plain"), 

997 html=kw.get("html"), 

998 source_data=kw.get("source_data"), 

999 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

1000 ) 

1001 

1002 

1003@pytest.fixture 

1004def push_collector(): 

1005 """ 

1006 See test_SendTestPushNotification for an example on how to use this fixture 

1007 """ 

1008 

1009 class Push: 

1010 """ 

1011 This allows nice access to the push info via e.g. push.title instead of push["title"] 

1012 """ 

1013 

1014 def __init__(self, kwargs): 

1015 self.kwargs = kwargs 

1016 

1017 def __getattr__(self, attr): 

1018 try: 

1019 return self.kwargs[attr] 

1020 except KeyError: 

1021 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

1022 

1023 def __repr__(self): 

1024 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

1025 return f"Push({kwargs_disp})" 

1026 

1027 class PushCollector: 

1028 def __init__(self): 

1029 # pairs of (user_id, push) 

1030 self.pushes = [] 

1031 

1032 def by_user(self, user_id): 

1033 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

1034 

1035 def push_to_user(self, session, user_id, **kwargs): 

1036 self.pushes.append((user_id, Push(kwargs=kwargs))) 

1037 

1038 def assert_user_has_count(self, user_id, count): 

1039 assert len(self.by_user(user_id)) == count 

1040 

1041 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

1042 push = self.by_user(user_id)[ix] 

1043 for kwarg in kwargs: 

1044 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

1045 assert push.kwargs[kwarg] == kwargs[kwarg], ( 

1046 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

1047 ) 

1048 

1049 def assert_user_has_single_matching(self, user_id, **kwargs): 

1050 self.assert_user_has_count(user_id, 1) 

1051 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

1052 

1053 collector = PushCollector() 

1054 

1055 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

1056 yield collector