Coverage for src/tests/test_fixtures.py: 99%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

380 statements  

1import os 

2from concurrent import futures 

3from contextlib import contextmanager 

4from datetime import date 

5from pathlib import Path 

6from unittest.mock import patch 

7 

8import grpc 

9import pytest 

10from sqlalchemy.sql import or_, text 

11 

12from couchers.config import config 

13from couchers.constants import GUIDELINES_VERSION, TOS_VERSION 

14from couchers.crypto import random_hex 

15from couchers.db import get_engine, session_scope 

16from couchers.interceptors import AuthValidatorInterceptor, _try_get_and_update_user_details 

17from couchers.models import ( 

18 Base, 

19 FriendRelationship, 

20 FriendStatus, 

21 HostingStatus, 

22 Language, 

23 LanguageAbility, 

24 LanguageFluency, 

25 Region, 

26 RegionLived, 

27 RegionVisited, 

28 User, 

29 UserBlock, 

30 UserSession, 

31) 

32from couchers.servicers.account import Account 

33from couchers.servicers.admin import Admin 

34from couchers.servicers.api import API 

35from couchers.servicers.auth import Auth, create_session 

36from couchers.servicers.blocking import Blocking 

37from couchers.servicers.bugs import Bugs 

38from couchers.servicers.communities import Communities 

39from couchers.servicers.conversations import Conversations 

40from couchers.servicers.discussions import Discussions 

41from couchers.servicers.donations import Donations, Stripe 

42from couchers.servicers.events import Events 

43from couchers.servicers.groups import Groups 

44from couchers.servicers.jail import Jail 

45from couchers.servicers.media import Media, get_media_auth_interceptor 

46from couchers.servicers.notifications import Notifications 

47from couchers.servicers.pages import Pages 

48from couchers.servicers.references import References 

49from couchers.servicers.reporting import Reporting 

50from couchers.servicers.requests import Requests 

51from couchers.servicers.resources import Resources 

52from couchers.servicers.search import Search 

53from couchers.servicers.threads import Threads 

54from couchers.sql import couchers_select as select 

55from couchers.utils import create_coordinate, now 

56from proto import ( 

57 account_pb2_grpc, 

58 admin_pb2_grpc, 

59 api_pb2_grpc, 

60 auth_pb2_grpc, 

61 blocking_pb2_grpc, 

62 bugs_pb2_grpc, 

63 communities_pb2_grpc, 

64 conversations_pb2_grpc, 

65 discussions_pb2_grpc, 

66 donations_pb2_grpc, 

67 events_pb2_grpc, 

68 groups_pb2_grpc, 

69 jail_pb2_grpc, 

70 media_pb2_grpc, 

71 notifications_pb2_grpc, 

72 pages_pb2_grpc, 

73 references_pb2_grpc, 

74 reporting_pb2_grpc, 

75 requests_pb2_grpc, 

76 resources_pb2_grpc, 

77 search_pb2_grpc, 

78 stripe_pb2_grpc, 

79 threads_pb2_grpc, 

80) 

81 

82 

83def drop_all(): 

84 """drop everything currently in the database""" 

85 with session_scope() as session: 

86 # postgis is required for all the Geographic Information System (GIS) stuff 

87 # pg_trgm is required for trigram based search 

88 # btree_gist is required for gist-based exclusion constraints 

89 session.execute( 

90 text( 

91 "DROP SCHEMA public CASCADE; DROP SCHEMA IF EXISTS logging CASCADE; CREATE SCHEMA public; CREATE SCHEMA logging; CREATE EXTENSION postgis; CREATE EXTENSION pg_trgm; CREATE EXTENSION btree_gist;" 

92 ) 

93 ) 

94 

95 

96def create_schema_from_models(): 

97 """ 

98 Create everything from the current models, not incrementally 

99 through migrations. 

100 """ 

101 

102 # create the slugify function 

103 functions = Path(__file__).parent / "slugify.sql" 

104 with open(functions) as f, session_scope() as session: 

105 session.execute(text(f.read())) 

106 

107 Base.metadata.create_all(get_engine()) 

108 

109 

110def populate_testing_resources(session): 

111 """ 

112 Testing version of couchers.resources.copy_resources_to_database 

113 """ 

114 regions = [ 

115 ("AUS", "Australia"), 

116 ("CAN", "Canada"), 

117 ("CHE", "Switzerland"), 

118 ("CUB", "Cuba"), 

119 ("CXR", "Christmas Island"), 

120 ("CZE", "Czechia"), 

121 ("DEU", "Germany"), 

122 ("EGY", "Egypt"), 

123 ("ESP", "Spain"), 

124 ("EST", "Estonia"), 

125 ("FIN", "Finland"), 

126 ("FRA", "France"), 

127 ("GBR", "United Kingdom"), 

128 ("GEO", "Georgia"), 

129 ("GHA", "Ghana"), 

130 ("GRC", "Greece"), 

131 ("HKG", "Hong Kong"), 

132 ("IRL", "Ireland"), 

133 ("ISR", "Israel"), 

134 ("ITA", "Italy"), 

135 ("JPN", "Japan"), 

136 ("LAO", "Laos"), 

137 ("MEX", "Mexico"), 

138 ("MMR", "Myanmar"), 

139 ("NAM", "Namibia"), 

140 ("NLD", "Netherlands"), 

141 ("NZL", "New Zealand"), 

142 ("POL", "Poland"), 

143 ("PRK", "North Korea"), 

144 ("REU", "Réunion"), 

145 ("SGP", "Singapore"), 

146 ("SWE", "Sweden"), 

147 ("THA", "Thailand"), 

148 ("TUR", "Turkey"), 

149 ("TWN", "Taiwan"), 

150 ("USA", "United States"), 

151 ("VNM", "Vietnam"), 

152 ] 

153 

154 languages = [ 

155 ("arb", "Arabic (Standard)"), 

156 ("deu", "German"), 

157 ("eng", "English"), 

158 ("fin", "Finnish"), 

159 ("fra", "French"), 

160 ("heb", "Hebrew"), 

161 ("hun", "Hungarian"), 

162 ("jpn", "Japanese"), 

163 ("pol", "Polish"), 

164 ("swe", "Swedish"), 

165 ("cmn", "Chinese (Mandarin)"), 

166 ] 

167 

168 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f: 

169 tz_sql = f.read() 

170 

171 for code, name in regions: 

172 session.add(Region(code=code, name=name)) 

173 

174 for code, name in languages: 

175 session.add(Language(code=code, name=name)) 

176 

177 session.execute(text(tz_sql)) 

178 

179 

180def recreate_database(): 

181 """ 

182 Connect to a running Postgres database, build it using metadata.create_all() 

183 """ 

184 

185 # running in non-UTC catches some timezone errors 

186 os.environ["TZ"] = "America/New_York" 

187 

188 # drop everything currently in the database 

189 drop_all() 

190 

191 # create everything from the current models, not incrementally through migrations 

192 create_schema_from_models() 

193 

194 with session_scope() as session: 

195 populate_testing_resources(session) 

196 

197 

198@pytest.fixture() 

199def db(): 

200 """ 

201 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all() 

202 """ 

203 

204 recreate_database() 

205 

206 

207def generate_user(*, delete_user=False, **kwargs): 

208 """ 

209 Create a new user, return session token 

210 

211 The user is detached from any session, and you can access its static attributes, but you can't modify it 

212 

213 Use this most of the time 

214 """ 

215 auth = Auth() 

216 

217 with session_scope() as session: 

218 # default args 

219 username = "test_user_" + random_hex(16) 

220 user_opts = { 

221 "username": username, 

222 "email": f"{username}@dev.couchers.org", 

223 # password is just 'password' 

224 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise) 

225 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg", 

226 "name": username.capitalize(), 

227 "hosting_status": HostingStatus.cant_host, 

228 "city": "Testing city", 

229 "hometown": "Test hometown", 

230 "community_standing": 0.5, 

231 "birthdate": date(year=2000, month=1, day=1), 

232 "gender": "N/A", 

233 "pronouns": "", 

234 "occupation": "Tester", 

235 "education": "UST(esting)", 

236 "about_me": "I test things", 

237 "my_travels": "Places", 

238 "things_i_like": "Code", 

239 "about_place": "My place has a lot of testing paraphenelia", 

240 "additional_information": "I can be a bit testy", 

241 # you need to make sure to update this logic to make sure the user is jailed/not on request 

242 "accepted_tos": TOS_VERSION, 

243 "accepted_community_guidelines": GUIDELINES_VERSION, 

244 "geom": create_coordinate(40.7108, -73.9740), 

245 "geom_radius": 100, 

246 "onboarding_emails_sent": 1, 

247 "last_onboarding_email_sent": now(), 

248 "new_notifications_enabled": True, 

249 } 

250 

251 for key, value in kwargs.items(): 

252 user_opts[key] = value 

253 

254 user = User(**user_opts) 

255 session.add(user) 

256 session.flush() 

257 

258 session.add(RegionVisited(user_id=user.id, region_code="CHE")) 

259 session.add(RegionVisited(user_id=user.id, region_code="REU")) 

260 session.add(RegionVisited(user_id=user.id, region_code="FIN")) 

261 

262 session.add(RegionLived(user_id=user.id, region_code="ESP")) 

263 session.add(RegionLived(user_id=user.id, region_code="FRA")) 

264 session.add(RegionLived(user_id=user.id, region_code="EST")) 

265 

266 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent)) 

267 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner)) 

268 

269 # this expires the user, so now it's "dirty" 

270 session.commit() 

271 

272 class _DummyContext: 

273 def invocation_metadata(self): 

274 return {} 

275 

276 token, _ = create_session(_DummyContext(), session, user, False) 

277 

278 # deleted user aborts session creation, hence this follows and necessitates a second commit 

279 if delete_user: 

280 user.is_deleted = True 

281 

282 user.recommendation_score = 1e10 - user.id 

283 

284 session.commit() 

285 

286 # refresh it, undoes the expiry 

287 session.refresh(user) 

288 # allows detaches the user from the session, allowing its use outside this session 

289 session.expunge(user) 

290 

291 return user, token 

292 

293 

294def get_user_id_and_token(session, username): 

295 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id 

296 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token 

297 return user_id, token 

298 

299 

300def make_friends(user1, user2): 

301 with session_scope() as session: 

302 friend_relationship = FriendRelationship( 

303 from_user_id=user1.id, 

304 to_user_id=user2.id, 

305 status=FriendStatus.accepted, 

306 ) 

307 session.add(friend_relationship) 

308 

309 

310def make_user_block(user1, user2): 

311 with session_scope() as session: 

312 user_block = UserBlock( 

313 blocking_user_id=user1.id, 

314 blocked_user_id=user2.id, 

315 ) 

316 session.add(user_block) 

317 session.commit() 

318 

319 

320def make_user_invisible(user_id): 

321 with session_scope() as session: 

322 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True 

323 

324 

325# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship 

326def get_friend_relationship(user1, user2): 

327 with session_scope() as session: 

328 friend_relationship = session.execute( 

329 select(FriendRelationship).where( 

330 or_( 

331 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id), 

332 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id), 

333 ) 

334 ) 

335 ).scalar_one_or_none() 

336 

337 session.expunge(friend_relationship) 

338 return friend_relationship 

339 

340 

341class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

342 """ 

343 Injects the right `cookie: couchers-sesh=...` header into the metadata 

344 """ 

345 

346 def __init__(self, token): 

347 self.token = token 

348 

349 def __call__(self, context, callback): 

350 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

351 

352 

353class FakeRpcError(grpc.RpcError): 

354 def __init__(self, code, details): 

355 self._code = code 

356 self._details = details 

357 

358 def code(self): 

359 return self._code 

360 

361 def details(self): 

362 return self._details 

363 

364 

365class FakeChannel: 

366 def __init__(self, user_id=None): 

367 self.handlers = {} 

368 self.user_id = user_id 

369 

370 def abort(self, code, details): 

371 raise FakeRpcError(code, details) 

372 

373 def add_generic_rpc_handlers(self, generic_rpc_handlers): 

374 from grpc._server import _validate_generic_rpc_handlers 

375 

376 _validate_generic_rpc_handlers(generic_rpc_handlers) 

377 

378 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

379 

380 def unary_unary(self, uri, request_serializer, response_deserializer): 

381 handler = self.handlers[uri] 

382 

383 def fake_handler(request): 

384 # Do a full serialization cycle on the request and the 

385 # response to catch accidental use of unserializable data. 

386 request = handler.request_deserializer(request_serializer(request)) 

387 

388 response = handler.unary_unary(request, self) 

389 

390 return response_deserializer(handler.response_serializer(response)) 

391 

392 return fake_handler 

393 

394 

395@contextmanager 

396def auth_api_session(): 

397 """ 

398 Create an Auth API for testing 

399 

400 This needs to use the real server since it plays around with headers 

401 """ 

402 with futures.ThreadPoolExecutor(1) as executor: 

403 server = grpc.server(executor) 

404 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

405 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

406 server.start() 

407 

408 try: 

409 with grpc.secure_channel(f"localhost:{port}", grpc.local_channel_credentials()) as channel: 

410 

411 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

412 def __init__(self): 

413 self.latest_headers = {} 

414 

415 def intercept_unary_unary(self, continuation, client_call_details, request): 

416 call = continuation(client_call_details, request) 

417 self.latest_headers = dict(call.initial_metadata()) 

418 return call 

419 

420 metadata_interceptor = _MetadataKeeperInterceptor() 

421 channel = grpc.intercept_channel(channel, metadata_interceptor) 

422 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

423 finally: 

424 server.stop(None).wait() 

425 

426 

427@contextmanager 

428def api_session(token): 

429 """ 

430 Create an API for testing, uses the token for auth 

431 """ 

432 channel = fake_channel(token) 

433 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

434 yield api_pb2_grpc.APIStub(channel) 

435 

436 

437@contextmanager 

438def real_api_session(token): 

439 """ 

440 Create an API for testing, using TCP sockets, uses the token for auth 

441 """ 

442 with futures.ThreadPoolExecutor(1) as executor: 

443 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

444 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

445 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

446 server.start() 

447 

448 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

449 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

450 

451 try: 

452 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

453 yield api_pb2_grpc.APIStub(channel) 

454 finally: 

455 server.stop(None).wait() 

456 

457 

458@contextmanager 

459def real_admin_session(token): 

460 """ 

461 Create a Admin service for testing, using TCP sockets, uses the token for auth 

462 """ 

463 with futures.ThreadPoolExecutor(1) as executor: 

464 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

465 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

466 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

467 server.start() 

468 

469 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

470 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

471 

472 try: 

473 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

474 yield admin_pb2_grpc.AdminStub(channel) 

475 finally: 

476 server.stop(None).wait() 

477 

478 

479@contextmanager 

480def real_jail_session(token): 

481 """ 

482 Create a Jail service for testing, using TCP sockets, uses the token for auth 

483 """ 

484 with futures.ThreadPoolExecutor(1) as executor: 

485 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

486 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

487 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

488 server.start() 

489 

490 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

491 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

492 

493 try: 

494 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

495 yield jail_pb2_grpc.JailStub(channel) 

496 finally: 

497 server.stop(None).wait() 

498 

499 

500def fake_channel(token): 

501 user_id, jailed, is_superuser = _try_get_and_update_user_details(token, is_api_key=False) 

502 return FakeChannel(user_id=user_id) 

503 

504 

505@contextmanager 

506def conversations_session(token): 

507 """ 

508 Create a Conversations API for testing, uses the token for auth 

509 """ 

510 channel = fake_channel(token) 

511 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

512 yield conversations_pb2_grpc.ConversationsStub(channel) 

513 

514 

515@contextmanager 

516def requests_session(token): 

517 """ 

518 Create a Requests API for testing, uses the token for auth 

519 """ 

520 channel = fake_channel(token) 

521 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

522 yield requests_pb2_grpc.RequestsStub(channel) 

523 

524 

525@contextmanager 

526def threads_session(token): 

527 channel = fake_channel(token) 

528 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

529 yield threads_pb2_grpc.ThreadsStub(channel) 

530 

531 

532@contextmanager 

533def discussions_session(token): 

534 channel = fake_channel(token) 

535 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

536 yield discussions_pb2_grpc.DiscussionsStub(channel) 

537 

538 

539@contextmanager 

540def donations_session(token): 

541 channel = fake_channel(token) 

542 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

543 yield donations_pb2_grpc.DonationsStub(channel) 

544 

545 

546@contextmanager 

547def real_stripe_session(): 

548 """ 

549 Create a Stripe service for testing, using TCP sockets 

550 """ 

551 with futures.ThreadPoolExecutor(1) as executor: 

552 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()]) 

553 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

554 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

555 server.start() 

556 

557 creds = grpc.local_channel_credentials() 

558 

559 try: 

560 with grpc.secure_channel(f"localhost:{port}", creds) as channel: 

561 yield stripe_pb2_grpc.StripeStub(channel) 

562 finally: 

563 server.stop(None).wait() 

564 

565 

566@contextmanager 

567def pages_session(token): 

568 channel = fake_channel(token) 

569 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

570 yield pages_pb2_grpc.PagesStub(channel) 

571 

572 

573@contextmanager 

574def communities_session(token): 

575 channel = fake_channel(token) 

576 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

577 yield communities_pb2_grpc.CommunitiesStub(channel) 

578 

579 

580@contextmanager 

581def groups_session(token): 

582 channel = fake_channel(token) 

583 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

584 yield groups_pb2_grpc.GroupsStub(channel) 

585 

586 

587@contextmanager 

588def blocking_session(token): 

589 channel = fake_channel(token) 

590 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

591 yield blocking_pb2_grpc.BlockingStub(channel) 

592 

593 

594@contextmanager 

595def notifications_session(token): 

596 channel = fake_channel(token) 

597 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

598 yield notifications_pb2_grpc.NotificationsStub(channel) 

599 

600 

601@contextmanager 

602def account_session(token): 

603 """ 

604 Create a Account API for testing, uses the token for auth 

605 """ 

606 channel = fake_channel(token) 

607 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

608 yield account_pb2_grpc.AccountStub(channel) 

609 

610 

611@contextmanager 

612def search_session(token): 

613 """ 

614 Create a Search API for testing, uses the token for auth 

615 """ 

616 channel = fake_channel(token) 

617 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

618 yield search_pb2_grpc.SearchStub(channel) 

619 

620 

621@contextmanager 

622def references_session(token): 

623 """ 

624 Create a References API for testing, uses the token for auth 

625 """ 

626 channel = fake_channel(token) 

627 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

628 yield references_pb2_grpc.ReferencesStub(channel) 

629 

630 

631@contextmanager 

632def reporting_session(token): 

633 channel = fake_channel(token) 

634 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

635 yield reporting_pb2_grpc.ReportingStub(channel) 

636 

637 

638@contextmanager 

639def events_session(token): 

640 channel = fake_channel(token) 

641 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

642 yield events_pb2_grpc.EventsStub(channel) 

643 

644 

645@contextmanager 

646def bugs_session(token=None): 

647 if token: 

648 channel = fake_channel(token) 

649 else: 

650 channel = FakeChannel() 

651 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

652 yield bugs_pb2_grpc.BugsStub(channel) 

653 

654 

655@contextmanager 

656def resources_session(): 

657 channel = FakeChannel() 

658 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

659 yield resources_pb2_grpc.ResourcesStub(channel) 

660 

661 

662@contextmanager 

663def media_session(bearer_token): 

664 """ 

665 Create a fresh Media API for testing, uses the bearer token for media auth 

666 """ 

667 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

668 

669 with futures.ThreadPoolExecutor(1) as executor: 

670 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

671 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

672 servicer = Media() 

673 media_pb2_grpc.add_MediaServicer_to_server(servicer, server) 

674 server.start() 

675 

676 call_creds = grpc.access_token_call_credentials(bearer_token) 

677 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

678 

679 try: 

680 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

681 yield media_pb2_grpc.MediaStub(channel) 

682 finally: 

683 server.stop(None).wait() 

684 

685 

686@pytest.fixture() 

687def testconfig(): 

688 prevconfig = config.copy() 

689 config.clear() 

690 config.update(prevconfig) 

691 

692 config["IN_TEST"] = True 

693 

694 config["DEV"] = True 

695 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b") 

696 config["VERSION"] = "testing_version" 

697 config["BASE_URL"] = "http://localhost:3000" 

698 config["COOKIE_DOMAIN"] = "localhost" 

699 

700 config["ENABLE_SMS"] = False 

701 config["SMS_SENDER_ID"] = "invalid" 

702 

703 config["ENABLE_EMAIL"] = False 

704 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org" 

705 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid" 

706 config["NOTIFICATION_EMAIL_PREFIX"] = "[TEST] " 

707 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid" 

708 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid" 

709 

710 config["ENABLE_DONATIONS"] = False 

711 config["STRIPE_API_KEY"] = "" 

712 config["STRIPE_WEBHOOK_SECRET"] = "" 

713 config["STRIPE_RECURRING_PRODUCT_ID"] = "" 

714 

715 config["SMTP_HOST"] = "localhost" 

716 config["SMTP_PORT"] = 587 

717 config["SMTP_USERNAME"] = "username" 

718 config["SMTP_PASSWORD"] = "password" 

719 

720 config["ENABLE_MEDIA"] = True 

721 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex( 

722 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc" 

723 ) 

724 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e" 

725 config["MEDIA_SERVER_BASE_URL"] = "http://127.0.0.1:5000" 

726 

727 config["BUG_TOOL_ENABLED"] = False 

728 config["BUG_TOOL_GITHUB_REPO"] = "org/repo" 

729 config["BUG_TOOL_GITHUB_USERNAME"] = "user" 

730 config["BUG_TOOL_GITHUB_TOKEN"] = "token" 

731 

732 config["MAILCHIMP_ENABLED"] = False 

733 config["MAILCHIMP_API_KEY"] = "f..." 

734 config["MAILCHIMP_DC"] = "us10" 

735 config["MAILCHIMP_LIST_ID"] = "b..." 

736 

737 yield None 

738 

739 config.clear() 

740 config.update(prevconfig) 

741 

742 

743@pytest.fixture 

744def fast_passwords(): 

745 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to 

746 # make this fast by removing the hashing step 

747 

748 def fast_hash(password: bytes) -> bytes: 

749 return b"fake hash:" + password 

750 

751 def fast_verify(hashed: bytes, password: bytes) -> bool: 

752 return hashed == fast_hash(password) 

753 

754 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify): 

755 with patch("couchers.crypto.nacl.pwhash.str", fast_hash): 

756 yield