Coverage for src/tests/test_fixtures.py: 96%

495 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 16:44 +0000

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from dataclasses import dataclass 

5from datetime import date 

6from pathlib import Path 

7from unittest.mock import patch 

8 

9import grpc 

10import pytest 

11from sqlalchemy.sql import or_, text 

12 

13from couchers.config import config 

14from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

15from couchers.crypto import random_hex 

16from couchers.db import clear_base_engine_cache, get_engine, session_scope 

17from couchers.descriptor_pool import get_descriptor_pool 

18from couchers.interceptors import AuthValidatorInterceptor, _try_get_and_update_user_details 

19from couchers.jobs.worker import process_job 

20from couchers.models import ( 

21 Base, 

22 FriendRelationship, 

23 FriendStatus, 

24 HostingStatus, 

25 Language, 

26 LanguageAbility, 

27 LanguageFluency, 

28 MeetupStatus, 

29 Region, 

30 RegionLived, 

31 RegionVisited, 

32 Upload, 

33 User, 

34 UserBlock, 

35 UserSession, 

36) 

37from couchers.servicers.account import Account, Iris 

38from couchers.servicers.admin import Admin 

39from couchers.servicers.api import API 

40from couchers.servicers.auth import Auth, create_session 

41from couchers.servicers.blocking import Blocking 

42from couchers.servicers.bugs import Bugs 

43from couchers.servicers.communities import Communities 

44from couchers.servicers.conversations import Conversations 

45from couchers.servicers.discussions import Discussions 

46from couchers.servicers.donations import Donations, Stripe 

47from couchers.servicers.events import Events 

48from couchers.servicers.groups import Groups 

49from couchers.servicers.jail import Jail 

50from couchers.servicers.media import Media, get_media_auth_interceptor 

51from couchers.servicers.notifications import Notifications 

52from couchers.servicers.pages import Pages 

53from couchers.servicers.references import References 

54from couchers.servicers.reporting import Reporting 

55from couchers.servicers.requests import Requests 

56from couchers.servicers.resources import Resources 

57from couchers.servicers.search import Search 

58from couchers.servicers.threads import Threads 

59from couchers.sql import couchers_select as select 

60from couchers.utils import create_coordinate, now 

61from proto import ( 

62 account_pb2_grpc, 

63 admin_pb2_grpc, 

64 annotations_pb2, 

65 api_pb2_grpc, 

66 auth_pb2_grpc, 

67 blocking_pb2_grpc, 

68 bugs_pb2_grpc, 

69 communities_pb2_grpc, 

70 conversations_pb2_grpc, 

71 discussions_pb2_grpc, 

72 donations_pb2_grpc, 

73 events_pb2_grpc, 

74 groups_pb2_grpc, 

75 iris_pb2_grpc, 

76 jail_pb2_grpc, 

77 media_pb2_grpc, 

78 notifications_pb2_grpc, 

79 pages_pb2_grpc, 

80 references_pb2_grpc, 

81 reporting_pb2_grpc, 

82 requests_pb2_grpc, 

83 resources_pb2_grpc, 

84 search_pb2_grpc, 

85 stripe_pb2_grpc, 

86 threads_pb2_grpc, 

87) 

88 

89 

90def drop_all(): 

91 """drop everything currently in the database""" 

92 with session_scope() as session: 

93 # postgis is required for all the Geographic Information System (GIS) stuff 

94 # pg_trgm is required for trigram based search 

95 # btree_gist is required for gist-based exclusion constraints 

96 session.execute( 

97 text( 

98 "DROP SCHEMA public CASCADE; DROP SCHEMA IF EXISTS logging CASCADE; CREATE SCHEMA public; CREATE SCHEMA logging; CREATE EXTENSION postgis; CREATE EXTENSION pg_trgm; CREATE EXTENSION btree_gist;" 

99 ) 

100 ) 

101 

102 

103def create_schema_from_models(): 

104 """ 

105 Create everything from the current models, not incrementally 

106 through migrations. 

107 """ 

108 

109 # create the slugify function 

110 functions = Path(__file__).parent / "slugify.sql" 

111 with open(functions) as f, session_scope() as session: 

112 session.execute(text(f.read())) 

113 

114 Base.metadata.create_all(get_engine()) 

115 

116 

117def populate_testing_resources(session): 

118 """ 

119 Testing version of couchers.resources.copy_resources_to_database 

120 """ 

121 regions = [ 

122 ("AUS", "Australia"), 

123 ("CAN", "Canada"), 

124 ("CHE", "Switzerland"), 

125 ("CUB", "Cuba"), 

126 ("CXR", "Christmas Island"), 

127 ("CZE", "Czechia"), 

128 ("DEU", "Germany"), 

129 ("EGY", "Egypt"), 

130 ("ESP", "Spain"), 

131 ("EST", "Estonia"), 

132 ("FIN", "Finland"), 

133 ("FRA", "France"), 

134 ("GBR", "United Kingdom"), 

135 ("GEO", "Georgia"), 

136 ("GHA", "Ghana"), 

137 ("GRC", "Greece"), 

138 ("HKG", "Hong Kong"), 

139 ("IRL", "Ireland"), 

140 ("ISR", "Israel"), 

141 ("ITA", "Italy"), 

142 ("JPN", "Japan"), 

143 ("LAO", "Laos"), 

144 ("MEX", "Mexico"), 

145 ("MMR", "Myanmar"), 

146 ("NAM", "Namibia"), 

147 ("NLD", "Netherlands"), 

148 ("NZL", "New Zealand"), 

149 ("POL", "Poland"), 

150 ("PRK", "North Korea"), 

151 ("REU", "Réunion"), 

152 ("SGP", "Singapore"), 

153 ("SWE", "Sweden"), 

154 ("THA", "Thailand"), 

155 ("TUR", "Turkey"), 

156 ("TWN", "Taiwan"), 

157 ("USA", "United States"), 

158 ("VNM", "Vietnam"), 

159 ] 

160 

161 languages = [ 

162 ("arb", "Arabic (Standard)"), 

163 ("deu", "German"), 

164 ("eng", "English"), 

165 ("fin", "Finnish"), 

166 ("fra", "French"), 

167 ("heb", "Hebrew"), 

168 ("hun", "Hungarian"), 

169 ("jpn", "Japanese"), 

170 ("pol", "Polish"), 

171 ("swe", "Swedish"), 

172 ("cmn", "Chinese (Mandarin)"), 

173 ] 

174 

175 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

176 tz_sql = f.read() 

177 

178 for code, name in regions: 

179 session.add(Region(code=code, name=name)) 

180 

181 for code, name in languages: 

182 session.add(Language(code=code, name=name)) 

183 

184 session.execute(text(tz_sql)) 

185 

186 

187def recreate_database(): 

188 """ 

189 Connect to a running Postgres database, build it using metadata.create_all() 

190 """ 

191 

192 # running in non-UTC catches some timezone errors 

193 os.environ["TZ"] = "America/New_York" 

194 

195 # drop everything currently in the database 

196 drop_all() 

197 clear_base_engine_cache() # to address errors like sqlalchemy.exc.InternalError: (psycopg2.errors.InternalError_) no spatial operator found for 'st_dwithin' 

198 

199 # create everything from the current models, not incrementally through migrations 

200 create_schema_from_models() 

201 

202 with session_scope() as session: 

203 populate_testing_resources(session) 

204 

205 

206@pytest.fixture() 

207def db(): 

208 """ 

209 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

210 """ 

211 

212 recreate_database() 

213 

214 

215def generate_user(*, delete_user=False, complete_profile=False, **kwargs): 

216 """ 

217 Create a new user, return session token 

218 

219 The user is detached from any session, and you can access its static attributes, but you can't modify it 

220 

221 Use this most of the time 

222 """ 

223 auth = Auth() 

224 

225 with session_scope() as session: 

226 # default args 

227 username = "test_user_" + random_hex(16) 

228 user_opts = { 

229 "username": username, 

230 "email": f"{username}@dev.couchers.org", 

231 # password is just 'password' 

232 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

233 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

234 "name": username.capitalize(), 

235 "hosting_status": HostingStatus.cant_host, 

236 "meetup_status": MeetupStatus.open_to_meetup, 

237 "city": "Testing city", 

238 "hometown": "Test hometown", 

239 "community_standing": 0.5, 

240 "birthdate": date(year=2000, month=1, day=1), 

241 "gender": "N/A", 

242 "pronouns": "", 

243 "occupation": "Tester", 

244 "education": "UST(esting)", 

245 "about_me": "I test things", 

246 "my_travels": "Places", 

247 "things_i_like": "Code", 

248 "about_place": "My place has a lot of testing paraphenelia", 

249 "additional_information": "I can be a bit testy", 

250 # you need to make sure to update this logic to make sure the user is jailed/not on request 

251 "accepted_tos": TOS_VERSION, 

252 "accepted_community_guidelines": GUIDELINES_VERSION, 

253 "geom": create_coordinate(40.7108, -73.9740), 

254 "geom_radius": 100, 

255 "onboarding_emails_sent": 1, 

256 "last_onboarding_email_sent": now(), 

257 } 

258 

259 for key, value in kwargs.items(): 

260 user_opts[key] = value 

261 

262 user = User(**user_opts) 

263 session.add(user) 

264 session.flush() 

265 

266 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

267 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

268 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

269 

270 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

271 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

272 session.add(RegionLived(user_id=user.id, region_code="EST")) 

273 

274 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

275 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

276 

277 # this expires the user, so now it's "dirty" 

278 session.commit() 

279 

280 class _DummyContext: 

281 def invocation_metadata(self): 

282 return {} 

283 

284 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False) 

285 

286 # deleted user aborts session creation, hence this follows and necessitates a second commit 

287 if delete_user: 

288 user.is_deleted = True 

289 

290 user.recommendation_score = 1e10 - user.id 

291 

292 if complete_profile: 

293 key = random_hex(32) 

294 filename = random_hex(32) + ".jpg" 

295 session.add( 

296 Upload( 

297 key=key, 

298 filename=filename, 

299 creator_user_id=user.id, 

300 ) 

301 ) 

302 session.flush() 

303 user.avatar_key = key 

304 user.about_me = "I have a complete profile!\n" * 10 

305 

306 session.commit() 

307 

308 assert user.has_completed_profile == complete_profile 

309 

310 # refresh it, undoes the expiry 

311 session.refresh(user) 

312 

313 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it 

314 user.timezone # noqa: B018 

315 

316 # allows detaches the user from the session, allowing its use outside this session 

317 session.expunge(user) 

318 

319 return user, token 

320 

321 

322def get_user_id_and_token(session, username): 

323 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

324 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

325 return user_id, token 

326 

327 

328def make_friends(user1, user2): 

329 with session_scope() as session: 

330 friend_relationship = FriendRelationship( 

331 from_user_id=user1.id, 

332 to_user_id=user2.id, 

333 status=FriendStatus.accepted, 

334 ) 

335 session.add(friend_relationship) 

336 

337 

338def make_user_block(user1, user2): 

339 with session_scope() as session: 

340 user_block = UserBlock( 

341 blocking_user_id=user1.id, 

342 blocked_user_id=user2.id, 

343 ) 

344 session.add(user_block) 

345 session.commit() 

346 

347 

348def make_user_invisible(user_id): 

349 with session_scope() as session: 

350 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

351 

352 

353# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

354def get_friend_relationship(user1, user2): 

355 with session_scope() as session: 

356 friend_relationship = session.execute( 

357 select(FriendRelationship).where( 

358 or_( 

359 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

360 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

361 ) 

362 ) 

363 ).scalar_one_or_none() 

364 

365 session.expunge(friend_relationship) 

366 return friend_relationship 

367 

368 

369class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

370 """ 

371 Injects the right `cookie: couchers-sesh=...` header into the metadata 

372 """ 

373 

374 def __init__(self, token): 

375 self.token = token 

376 

377 def __call__(self, context, callback): 

378 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

379 

380 

381@contextmanager 

382def auth_api_session(): 

383 """ 

384 Create an Auth API for testing 

385 

386 This needs to use the real server since it plays around with headers 

387 """ 

388 with futures.ThreadPoolExecutor(1) as executor: 

389 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

390 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

391 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

392 server.start() 

393 

394 try: 

395 with grpc.secure_channel(f"localhost:{port}", grpc.local_channel_credentials()) as channel: 

396 

397 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

398 def __init__(self): 

399 self.latest_headers = {} 

400 

401 def intercept_unary_unary(self, continuation, client_call_details, request): 

402 call = continuation(client_call_details, request) 

403 self.latest_headers = dict(call.initial_metadata()) 

404 return call 

405 

406 metadata_interceptor = _MetadataKeeperInterceptor() 

407 channel = grpc.intercept_channel(channel, metadata_interceptor) 

408 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

409 finally: 

410 server.stop(None).wait() 

411 

412 

413@contextmanager 

414def api_session(token): 

415 """ 

416 Create an API for testing, uses the token for auth 

417 """ 

418 channel = fake_channel(token) 

419 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

420 yield api_pb2_grpc.APIStub(channel) 

421 

422 

423@contextmanager 

424def real_api_session(token): 

425 """ 

426 Create an API for testing, using TCP sockets, uses the token for auth 

427 """ 

428 with futures.ThreadPoolExecutor(1) as executor: 

429 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

430 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

431 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

432 server.start() 

433 

434 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

435 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

436 

437 try: 

438 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

439 yield api_pb2_grpc.APIStub(channel) 

440 finally: 

441 server.stop(None).wait() 

442 

443 

444@contextmanager 

445def real_admin_session(token): 

446 """ 

447 Create a Admin service for testing, using TCP sockets, uses the token for auth 

448 """ 

449 with futures.ThreadPoolExecutor(1) as executor: 

450 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

451 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

452 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

453 server.start() 

454 

455 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

456 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

457 

458 try: 

459 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

460 yield admin_pb2_grpc.AdminStub(channel) 

461 finally: 

462 server.stop(None).wait() 

463 

464 

465@contextmanager 

466def real_account_session(token): 

467 """ 

468 Create a Account service for testing, using TCP sockets, uses the token for auth 

469 """ 

470 with futures.ThreadPoolExecutor(1) as executor: 

471 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

472 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

473 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

474 server.start() 

475 

476 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

477 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

478 

479 try: 

480 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

481 yield account_pb2_grpc.AccountStub(channel) 

482 finally: 

483 server.stop(None).wait() 

484 

485 

486@contextmanager 

487def real_jail_session(token): 

488 """ 

489 Create a Jail service for testing, using TCP sockets, uses the token for auth 

490 """ 

491 with futures.ThreadPoolExecutor(1) as executor: 

492 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

493 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

494 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

495 server.start() 

496 

497 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

498 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

499 

500 try: 

501 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

502 yield jail_pb2_grpc.JailStub(channel) 

503 finally: 

504 server.stop(None).wait() 

505 

506 

507class FakeRpcError(grpc.RpcError): 

508 def __init__(self, code, details): 

509 self._code = code 

510 self._details = details 

511 

512 def code(self): 

513 return self._code 

514 

515 def details(self): 

516 return self._details 

517 

518 

519def _check_user_perms(method, user_id, is_jailed, is_superuser): 

520 # method is of the form "/org.couchers.api.core.API/GetUser" 

521 _, service_name, method_name = method.split("/") 

522 

523 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

524 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

525 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

526 assert auth_level in [ 

527 annotations_pb2.AUTH_LEVEL_OPEN, 

528 annotations_pb2.AUTH_LEVEL_JAILED, 

529 annotations_pb2.AUTH_LEVEL_SECURE, 

530 annotations_pb2.AUTH_LEVEL_ADMIN, 

531 ] 

532 

533 if not user_id: 

534 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

535 else: 

536 assert not ( 

537 auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser 

538 ), "Non-superuser tried to call superuser API" 

539 assert not ( 

540 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

541 ), "User is jailed but tried to call non-open/non-jailed API" 

542 

543 

544class FakeChannel: 

545 def __init__(self, user_id=None, is_jailed=None, is_superuser=None): 

546 self.handlers = {} 

547 self.user_id = user_id 

548 self._is_jailed = is_jailed 

549 self._is_superuser = is_superuser 

550 

551 def abort(self, code, details): 

552 raise FakeRpcError(code, details) 

553 

554 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

555 from grpc._server import _validate_generic_rpc_handlers 

556 

557 _validate_generic_rpc_handlers(generic_rpc_handlers) 

558 

559 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

560 

561 def unary_unary(self, uri, request_serializer, response_deserializer): 

562 handler = self.handlers[uri] 

563 

564 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser) 

565 

566 def fake_handler(request): 

567 # Do a full serialization cycle on the request and the 

568 # response to catch accidental use of unserializable data. 

569 request = handler.request_deserializer(request_serializer(request)) 

570 

571 response = handler.unary_unary(request, self) 

572 

573 return response_deserializer(handler.response_serializer(response)) 

574 

575 return fake_handler 

576 

577 

578def fake_channel(token=None): 

579 if token: 

580 user_id, is_jailed, is_superuser = _try_get_and_update_user_details(token, is_api_key=False) 

581 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser) 

582 return FakeChannel() 

583 

584 

585@contextmanager 

586def conversations_session(token): 

587 """ 

588 Create a Conversations API for testing, uses the token for auth 

589 """ 

590 channel = fake_channel(token) 

591 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

592 yield conversations_pb2_grpc.ConversationsStub(channel) 

593 

594 

595@contextmanager 

596def requests_session(token): 

597 """ 

598 Create a Requests API for testing, uses the token for auth 

599 """ 

600 channel = fake_channel(token) 

601 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

602 yield requests_pb2_grpc.RequestsStub(channel) 

603 

604 

605@contextmanager 

606def threads_session(token): 

607 channel = fake_channel(token) 

608 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

609 yield threads_pb2_grpc.ThreadsStub(channel) 

610 

611 

612@contextmanager 

613def discussions_session(token): 

614 channel = fake_channel(token) 

615 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

616 yield discussions_pb2_grpc.DiscussionsStub(channel) 

617 

618 

619@contextmanager 

620def donations_session(token): 

621 channel = fake_channel(token) 

622 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

623 yield donations_pb2_grpc.DonationsStub(channel) 

624 

625 

626@contextmanager 

627def real_stripe_session(): 

628 """ 

629 Create a Stripe service for testing, using TCP sockets 

630 """ 

631 with futures.ThreadPoolExecutor(1) as executor: 

632 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

633 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

634 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

635 server.start() 

636 

637 creds = grpc.local_channel_credentials() 

638 

639 try: 

640 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

641 yield stripe_pb2_grpc.StripeStub(channel) 

642 finally: 

643 server.stop(None).wait() 

644 

645 

646@contextmanager 

647def real_iris_session(): 

648 with futures.ThreadPoolExecutor(1) as executor: 

649 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

650 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

651 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

652 server.start() 

653 

654 creds = grpc.local_channel_credentials() 

655 

656 try: 

657 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

658 yield iris_pb2_grpc.IrisStub(channel) 

659 finally: 

660 server.stop(None).wait() 

661 

662 

663@contextmanager 

664def pages_session(token): 

665 channel = fake_channel(token) 

666 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

667 yield pages_pb2_grpc.PagesStub(channel) 

668 

669 

670@contextmanager 

671def communities_session(token): 

672 channel = fake_channel(token) 

673 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

674 yield communities_pb2_grpc.CommunitiesStub(channel) 

675 

676 

677@contextmanager 

678def groups_session(token): 

679 channel = fake_channel(token) 

680 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

681 yield groups_pb2_grpc.GroupsStub(channel) 

682 

683 

684@contextmanager 

685def blocking_session(token): 

686 channel = fake_channel(token) 

687 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

688 yield blocking_pb2_grpc.BlockingStub(channel) 

689 

690 

691@contextmanager 

692def notifications_session(token): 

693 channel = fake_channel(token) 

694 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

695 yield notifications_pb2_grpc.NotificationsStub(channel) 

696 

697 

698@contextmanager 

699def account_session(token): 

700 """ 

701 Create a Account API for testing, uses the token for auth 

702 """ 

703 channel = fake_channel(token) 

704 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

705 yield account_pb2_grpc.AccountStub(channel) 

706 

707 

708@contextmanager 

709def search_session(token): 

710 """ 

711 Create a Search API for testing, uses the token for auth 

712 """ 

713 channel = fake_channel(token) 

714 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

715 yield search_pb2_grpc.SearchStub(channel) 

716 

717 

718@contextmanager 

719def references_session(token): 

720 """ 

721 Create a References API for testing, uses the token for auth 

722 """ 

723 channel = fake_channel(token) 

724 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

725 yield references_pb2_grpc.ReferencesStub(channel) 

726 

727 

728@contextmanager 

729def reporting_session(token): 

730 channel = fake_channel(token) 

731 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

732 yield reporting_pb2_grpc.ReportingStub(channel) 

733 

734 

735@contextmanager 

736def events_session(token): 

737 channel = fake_channel(token) 

738 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

739 yield events_pb2_grpc.EventsStub(channel) 

740 

741 

742@contextmanager 

743def bugs_session(token=None): 

744 channel = fake_channel(token) 

745 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

746 yield bugs_pb2_grpc.BugsStub(channel) 

747 

748 

749@contextmanager 

750def resources_session(): 

751 channel = fake_channel() 

752 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

753 yield resources_pb2_grpc.ResourcesStub(channel) 

754 

755 

756@contextmanager 

757def media_session(bearer_token): 

758 """ 

759 Create a fresh Media API for testing, uses the bearer token for media auth 

760 """ 

761 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

762 

763 with futures.ThreadPoolExecutor(1) as executor: 

764 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

765 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

766 servicer = Media() 

767 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

768 server.start() 

769 

770 call_creds = grpc.access_token_call_credentials(bearer_token) 

771 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

772 

773 try: 

774 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

775 yield media_pb2_grpc.MediaStub(channel) 

776 finally: 

777 server.stop(None).wait() 

778 

779 

780@pytest.fixture(scope="class") 

781def testconfig(): 

782 prevconfig = config.copy() 

783 config.clear() 

784 config.update(prevconfig) 

785 

786 config["IN_TEST"] = True 

787 

788 config["DEV"] = True 

789 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

790 config["VERSION"] = "testing_version" 

791 config["BASE_URL"] = "http://localhost:3000" 

792 config["BACKEND_BASE_URL"] = "http://localhost:8888" 

793 config["CONSOLE_BASE_URL"] = "http://localhost:8888" 

794 config["COOKIE_DOMAIN"] = "localhost" 

795 

796 config["ENABLE_SMS"] = False 

797 config["SMS_SENDER_ID"] = "invalid" 

798 

799 config["ENABLE_EMAIL"] = False 

800 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

801 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

802 config["NOTIFICATION_EMAIL_PREFIX"] = "[TEST] " 

803 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

804 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

805 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid" 

806 

807 config["ENABLE_DONATIONS"] = False 

808 config["STRIPE_API_KEY"] = "" 

809 config["STRIPE_WEBHOOK_SECRET"] = "" 

810 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

811 

812 config["ENABLE_STRONG_VERIFICATION"] = False 

813 config["IRIS_ID_PUBKEY"] = "" 

814 config["IRIS_ID_SECRET"] = "" 

815 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272 

816 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex( 

817 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f" 

818 ) 

819 

820 config["SMTP_HOST"] = "localhost" 

821 config["SMTP_PORT"] = 587 

822 config["SMTP_USERNAME"] = "username" 

823 config["SMTP_PASSWORD"] = "password" 

824 

825 config["ENABLE_MEDIA"] = True 

826 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

827 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

828 ) 

829 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

830 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5000" 

831 

832 config["BUG_TOOL_ENABLED"] = False 

833 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

834 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

835 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

836 

837 config["LISTMONK_ENABLED"] = False 

838 config["LISTMONK_BASE_URL"] = "https://localhost" 

839 config["LISTMONK_API_KEY"] = "..." 

840 config["LISTMONK_LIST_UUID"] = "..." 

841 

842 config["PUSH_NOTIFICATIONS_ENABLED"] = True 

843 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI" 

844 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid" 

845 

846 yield None 

847 

848 config.clear() 

849 config.update(prevconfig) 

850 

851 

852@pytest.fixture 

853def fast_passwords(): 

854 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

855 # make this fast by removing the hashing step 

856 

857 def fast_hash(password: bytes) -> bytes: 

858 return b"fake hash:" + password 

859 

860 def fast_verify(hashed: bytes, password: bytes) -> bool: 

861 return hashed == fast_hash(password) 

862 

863 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

864 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

865 yield 

866 

867 

868def process_jobs(): 

869 while process_job(): 

870 pass 

871 

872 

873@contextmanager 

874def mock_notification_email(): 

875 with patch("couchers.email._queue_email") as mock: 

876 yield mock 

877 process_jobs() 

878 

879 

880@dataclass 

881class EmailData: 

882 sender_name: str 

883 sender_email: str 

884 recipient: str 

885 subject: str 

886 plain: str 

887 html: str 

888 source_data: str 

889 list_unsubscribe_header: str 

890 

891 

892def email_fields(mock, call_ix=0): 

893 _, kw = mock.call_args_list[call_ix] 

894 return EmailData( 

895 sender_name=kw.get("sender_name"), 

896 sender_email=kw.get("sender_email"), 

897 recipient=kw.get("recipient"), 

898 subject=kw.get("subject"), 

899 plain=kw.get("plain"), 

900 html=kw.get("html"), 

901 source_data=kw.get("source_data"), 

902 list_unsubscribe_header=kw.get("list_unsubscribe_header"), 

903 ) 

904 

905 

906@pytest.fixture 

907def push_collector(): 

908 """ 

909 See test_SendTestPushNotification for an example on how to use this fixture 

910 """ 

911 

912 class Push: 

913 """ 

914 This allows nice access to the push info via e.g. push.title instead of push["title"] 

915 """ 

916 

917 def __init__(self, kwargs): 

918 self.kwargs = kwargs 

919 

920 def __getattr__(self, attr): 

921 try: 

922 return self.kwargs[attr] 

923 except KeyError: 

924 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None 

925 

926 def __repr__(self): 

927 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items()) 

928 return f"Push({kwargs_disp})" 

929 

930 class PushCollector: 

931 def __init__(self): 

932 # pairs of (user_id, push) 

933 self.pushes = [] 

934 

935 def by_user(self, user_id): 

936 return [kwargs for uid, kwargs in self.pushes if uid == user_id] 

937 

938 def push_to_user(self, user_id, **kwargs): 

939 self.pushes.append((user_id, Push(kwargs=kwargs))) 

940 

941 def assert_user_has_count(self, user_id, count): 

942 assert len(self.by_user(user_id)) == count 

943 

944 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs): 

945 push = self.by_user(user_id)[ix] 

946 for kwarg in kwargs: 

947 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'" 

948 assert ( 

949 push.kwargs[kwarg] == kwargs[kwarg] 

950 ), f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'" 

951 

952 def assert_user_has_single_matching(self, user_id, **kwargs): 

953 self.assert_user_has_count(user_id, 1) 

954 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs) 

955 

956 collector = PushCollector() 

957 

958 with patch("couchers.notifications.push._push_to_user", collector.push_to_user): 

959 yield collector