Coverage for src/tests/test_fixtures.py: 99%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from datetime import date
5from pathlib import Path
6from unittest.mock import patch
8import grpc
9import pytest
10from sqlalchemy.sql import or_, text
12from couchers.config import config
13from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
14from couchers.crypto import random_hex
15from couchers.db import get_engine, session_scope
16from couchers.interceptors import AuthValidatorInterceptor, _try_get_and_update_user_details
17from couchers.models import (
18 Base,
19 FriendRelationship,
20 FriendStatus,
21 HostingStatus,
22 Language,
23 LanguageAbility,
24 LanguageFluency,
25 Region,
26 RegionLived,
27 RegionVisited,
28 User,
29 UserBlock,
30 UserSession,
31)
32from couchers.servicers.account import Account
33from couchers.servicers.admin import Admin
34from couchers.servicers.api import API
35from couchers.servicers.auth import Auth, create_session
36from couchers.servicers.blocking import Blocking
37from couchers.servicers.bugs import Bugs
38from couchers.servicers.communities import Communities
39from couchers.servicers.conversations import Conversations
40from couchers.servicers.discussions import Discussions
41from couchers.servicers.donations import Donations, Stripe
42from couchers.servicers.events import Events
43from couchers.servicers.groups import Groups
44from couchers.servicers.jail import Jail
45from couchers.servicers.media import Media, get_media_auth_interceptor
46from couchers.servicers.notifications import Notifications
47from couchers.servicers.pages import Pages
48from couchers.servicers.references import References
49from couchers.servicers.reporting import Reporting
50from couchers.servicers.requests import Requests
51from couchers.servicers.resources import Resources
52from couchers.servicers.search import Search
53from couchers.servicers.threads import Threads
54from couchers.sql import couchers_select as select
55from couchers.utils import create_coordinate, now
56from proto import (
57 account_pb2_grpc,
58 admin_pb2_grpc,
59 api_pb2_grpc,
60 auth_pb2_grpc,
61 blocking_pb2_grpc,
62 bugs_pb2_grpc,
63 communities_pb2_grpc,
64 conversations_pb2_grpc,
65 discussions_pb2_grpc,
66 donations_pb2_grpc,
67 events_pb2_grpc,
68 groups_pb2_grpc,
69 jail_pb2_grpc,
70 media_pb2_grpc,
71 notifications_pb2_grpc,
72 pages_pb2_grpc,
73 references_pb2_grpc,
74 reporting_pb2_grpc,
75 requests_pb2_grpc,
76 resources_pb2_grpc,
77 search_pb2_grpc,
78 stripe_pb2_grpc,
79 threads_pb2_grpc,
80)
83def drop_all():
84 """drop everything currently in the database"""
85 with session_scope() as session:
86 # postgis is required for all the Geographic Information System (GIS) stuff
87 # pg_trgm is required for trigram based search
88 # btree_gist is required for gist-based exclusion constraints
89 session.execute(
90 text(
91 "DROP SCHEMA public CASCADE; DROP SCHEMA IF EXISTS logging CASCADE; CREATE SCHEMA public; CREATE SCHEMA logging; CREATE EXTENSION postgis; CREATE EXTENSION pg_trgm; CREATE EXTENSION btree_gist;"
92 )
93 )
96def create_schema_from_models():
97 """
98 Create everything from the current models, not incrementally
99 through migrations.
100 """
102 # create the slugify function
103 functions = Path(__file__).parent / "slugify.sql"
104 with open(functions) as f, session_scope() as session:
105 session.execute(text(f.read()))
107 Base.metadata.create_all(get_engine())
110def populate_testing_resources(session):
111 """
112 Testing version of couchers.resources.copy_resources_to_database
113 """
114 regions = [
115 ("AUS", "Australia"),
116 ("CAN", "Canada"),
117 ("CHE", "Switzerland"),
118 ("CUB", "Cuba"),
119 ("CXR", "Christmas Island"),
120 ("CZE", "Czechia"),
121 ("DEU", "Germany"),
122 ("EGY", "Egypt"),
123 ("ESP", "Spain"),
124 ("EST", "Estonia"),
125 ("FIN", "Finland"),
126 ("FRA", "France"),
127 ("GBR", "United Kingdom"),
128 ("GEO", "Georgia"),
129 ("GHA", "Ghana"),
130 ("GRC", "Greece"),
131 ("HKG", "Hong Kong"),
132 ("IRL", "Ireland"),
133 ("ISR", "Israel"),
134 ("ITA", "Italy"),
135 ("JPN", "Japan"),
136 ("LAO", "Laos"),
137 ("MEX", "Mexico"),
138 ("MMR", "Myanmar"),
139 ("NAM", "Namibia"),
140 ("NLD", "Netherlands"),
141 ("NZL", "New Zealand"),
142 ("POL", "Poland"),
143 ("PRK", "North Korea"),
144 ("REU", "Réunion"),
145 ("SGP", "Singapore"),
146 ("SWE", "Sweden"),
147 ("THA", "Thailand"),
148 ("TUR", "Turkey"),
149 ("TWN", "Taiwan"),
150 ("USA", "United States"),
151 ("VNM", "Vietnam"),
152 ]
154 languages = [
155 ("arb", "Arabic (Standard)"),
156 ("deu", "German"),
157 ("eng", "English"),
158 ("fin", "Finnish"),
159 ("fra", "French"),
160 ("heb", "Hebrew"),
161 ("hun", "Hungarian"),
162 ("jpn", "Japanese"),
163 ("pol", "Polish"),
164 ("swe", "Swedish"),
165 ("cmn", "Chinese (Mandarin)"),
166 ]
168 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
169 tz_sql = f.read()
171 for code, name in regions:
172 session.add(Region(code=code, name=name))
174 for code, name in languages:
175 session.add(Language(code=code, name=name))
177 session.execute(text(tz_sql))
180def recreate_database():
181 """
182 Connect to a running Postgres database, build it using metadata.create_all()
183 """
185 # running in non-UTC catches some timezone errors
186 os.environ["TZ"] = "America/New_York"
188 # drop everything currently in the database
189 drop_all()
191 # create everything from the current models, not incrementally through migrations
192 create_schema_from_models()
194 with session_scope() as session:
195 populate_testing_resources(session)
198@pytest.fixture()
199def db():
200 """
201 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
202 """
204 recreate_database()
207def generate_user(*, delete_user=False, **kwargs):
208 """
209 Create a new user, return session token
211 The user is detached from any session, and you can access its static attributes, but you can't modify it
213 Use this most of the time
214 """
215 auth = Auth()
217 with session_scope() as session:
218 # default args
219 username = "test_user_" + random_hex(16)
220 user_opts = {
221 "username": username,
222 "email": f"{username}@dev.couchers.org",
223 # password is just 'password'
224 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
225 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
226 "name": username.capitalize(),
227 "hosting_status": HostingStatus.cant_host,
228 "city": "Testing city",
229 "hometown": "Test hometown",
230 "community_standing": 0.5,
231 "birthdate": date(year=2000, month=1, day=1),
232 "gender": "N/A",
233 "pronouns": "",
234 "occupation": "Tester",
235 "education": "UST(esting)",
236 "about_me": "I test things",
237 "my_travels": "Places",
238 "things_i_like": "Code",
239 "about_place": "My place has a lot of testing paraphenelia",
240 "additional_information": "I can be a bit testy",
241 # you need to make sure to update this logic to make sure the user is jailed/not on request
242 "accepted_tos": TOS_VERSION,
243 "accepted_community_guidelines": GUIDELINES_VERSION,
244 "geom": create_coordinate(40.7108, -73.9740),
245 "geom_radius": 100,
246 "onboarding_emails_sent": 1,
247 "last_onboarding_email_sent": now(),
248 "new_notifications_enabled": True,
249 }
251 for key, value in kwargs.items():
252 user_opts[key] = value
254 user = User(**user_opts)
255 session.add(user)
256 session.flush()
258 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
259 session.add(RegionVisited(user_id=user.id, region_code="REU"))
260 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
262 session.add(RegionLived(user_id=user.id, region_code="ESP"))
263 session.add(RegionLived(user_id=user.id, region_code="FRA"))
264 session.add(RegionLived(user_id=user.id, region_code="EST"))
266 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
267 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
269 # this expires the user, so now it's "dirty"
270 session.commit()
272 class _DummyContext:
273 def invocation_metadata(self):
274 return {}
276 token, _ = create_session(_DummyContext(), session, user, False)
278 # deleted user aborts session creation, hence this follows and necessitates a second commit
279 if delete_user:
280 user.is_deleted = True
282 user.recommendation_score = 1e10 - user.id
284 session.commit()
286 # refresh it, undoes the expiry
287 session.refresh(user)
288 # allows detaches the user from the session, allowing its use outside this session
289 session.expunge(user)
291 return user, token
294def get_user_id_and_token(session, username):
295 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
296 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
297 return user_id, token
300def make_friends(user1, user2):
301 with session_scope() as session:
302 friend_relationship = FriendRelationship(
303 from_user_id=user1.id,
304 to_user_id=user2.id,
305 status=FriendStatus.accepted,
306 )
307 session.add(friend_relationship)
310def make_user_block(user1, user2):
311 with session_scope() as session:
312 user_block = UserBlock(
313 blocking_user_id=user1.id,
314 blocked_user_id=user2.id,
315 )
316 session.add(user_block)
317 session.commit()
320def make_user_invisible(user_id):
321 with session_scope() as session:
322 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
325# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
326def get_friend_relationship(user1, user2):
327 with session_scope() as session:
328 friend_relationship = session.execute(
329 select(FriendRelationship).where(
330 or_(
331 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
332 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
333 )
334 )
335 ).scalar_one_or_none()
337 session.expunge(friend_relationship)
338 return friend_relationship
341class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
342 """
343 Injects the right `cookie: couchers-sesh=...` header into the metadata
344 """
346 def __init__(self, token):
347 self.token = token
349 def __call__(self, context, callback):
350 callback((("cookie", f"couchers-sesh={self.token}"),), None)
353class FakeRpcError(grpc.RpcError):
354 def __init__(self, code, details):
355 self._code = code
356 self._details = details
358 def code(self):
359 return self._code
361 def details(self):
362 return self._details
365class FakeChannel:
366 def __init__(self, user_id=None):
367 self.handlers = {}
368 self.user_id = user_id
370 def abort(self, code, details):
371 raise FakeRpcError(code, details)
373 def add_generic_rpc_handlers(self, generic_rpc_handlers):
374 from grpc._server import _validate_generic_rpc_handlers
376 _validate_generic_rpc_handlers(generic_rpc_handlers)
378 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
380 def unary_unary(self, uri, request_serializer, response_deserializer):
381 handler = self.handlers[uri]
383 def fake_handler(request):
384 # Do a full serialization cycle on the request and the
385 # response to catch accidental use of unserializable data.
386 request = handler.request_deserializer(request_serializer(request))
388 response = handler.unary_unary(request, self)
390 return response_deserializer(handler.response_serializer(response))
392 return fake_handler
395@contextmanager
396def auth_api_session():
397 """
398 Create an Auth API for testing
400 This needs to use the real server since it plays around with headers
401 """
402 with futures.ThreadPoolExecutor(1) as executor:
403 server = grpc.server(executor)
404 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
405 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
406 server.start()
408 try:
409 with grpc.secure_channel(f"localhost:{port}", grpc.local_channel_credentials()) as channel:
411 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
412 def __init__(self):
413 self.latest_headers = {}
415 def intercept_unary_unary(self, continuation, client_call_details, request):
416 call = continuation(client_call_details, request)
417 self.latest_headers = dict(call.initial_metadata())
418 return call
420 metadata_interceptor = _MetadataKeeperInterceptor()
421 channel = grpc.intercept_channel(channel, metadata_interceptor)
422 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
423 finally:
424 server.stop(None).wait()
427@contextmanager
428def api_session(token):
429 """
430 Create an API for testing, uses the token for auth
431 """
432 channel = fake_channel(token)
433 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
434 yield api_pb2_grpc.APIStub(channel)
437@contextmanager
438def real_api_session(token):
439 """
440 Create an API for testing, using TCP sockets, uses the token for auth
441 """
442 with futures.ThreadPoolExecutor(1) as executor:
443 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()])
444 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
445 api_pb2_grpc.add_APIServicer_to_server(API(), server)
446 server.start()
448 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
449 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
451 try:
452 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
453 yield api_pb2_grpc.APIStub(channel)
454 finally:
455 server.stop(None).wait()
458@contextmanager
459def real_admin_session(token):
460 """
461 Create a Admin service for testing, using TCP sockets, uses the token for auth
462 """
463 with futures.ThreadPoolExecutor(1) as executor:
464 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()])
465 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
466 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
467 server.start()
469 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
470 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
472 try:
473 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
474 yield admin_pb2_grpc.AdminStub(channel)
475 finally:
476 server.stop(None).wait()
479@contextmanager
480def real_jail_session(token):
481 """
482 Create a Jail service for testing, using TCP sockets, uses the token for auth
483 """
484 with futures.ThreadPoolExecutor(1) as executor:
485 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()])
486 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
487 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
488 server.start()
490 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
491 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
493 try:
494 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
495 yield jail_pb2_grpc.JailStub(channel)
496 finally:
497 server.stop(None).wait()
500def fake_channel(token):
501 user_id, jailed, is_superuser = _try_get_and_update_user_details(token, is_api_key=False)
502 return FakeChannel(user_id=user_id)
505@contextmanager
506def conversations_session(token):
507 """
508 Create a Conversations API for testing, uses the token for auth
509 """
510 channel = fake_channel(token)
511 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
512 yield conversations_pb2_grpc.ConversationsStub(channel)
515@contextmanager
516def requests_session(token):
517 """
518 Create a Requests API for testing, uses the token for auth
519 """
520 channel = fake_channel(token)
521 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
522 yield requests_pb2_grpc.RequestsStub(channel)
525@contextmanager
526def threads_session(token):
527 channel = fake_channel(token)
528 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
529 yield threads_pb2_grpc.ThreadsStub(channel)
532@contextmanager
533def discussions_session(token):
534 channel = fake_channel(token)
535 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
536 yield discussions_pb2_grpc.DiscussionsStub(channel)
539@contextmanager
540def donations_session(token):
541 channel = fake_channel(token)
542 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
543 yield donations_pb2_grpc.DonationsStub(channel)
546@contextmanager
547def real_stripe_session():
548 """
549 Create a Stripe service for testing, using TCP sockets
550 """
551 with futures.ThreadPoolExecutor(1) as executor:
552 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor()])
553 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
554 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
555 server.start()
557 creds = grpc.local_channel_credentials()
559 try:
560 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
561 yield stripe_pb2_grpc.StripeStub(channel)
562 finally:
563 server.stop(None).wait()
566@contextmanager
567def pages_session(token):
568 channel = fake_channel(token)
569 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
570 yield pages_pb2_grpc.PagesStub(channel)
573@contextmanager
574def communities_session(token):
575 channel = fake_channel(token)
576 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
577 yield communities_pb2_grpc.CommunitiesStub(channel)
580@contextmanager
581def groups_session(token):
582 channel = fake_channel(token)
583 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
584 yield groups_pb2_grpc.GroupsStub(channel)
587@contextmanager
588def blocking_session(token):
589 channel = fake_channel(token)
590 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
591 yield blocking_pb2_grpc.BlockingStub(channel)
594@contextmanager
595def notifications_session(token):
596 channel = fake_channel(token)
597 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
598 yield notifications_pb2_grpc.NotificationsStub(channel)
601@contextmanager
602def account_session(token):
603 """
604 Create a Account API for testing, uses the token for auth
605 """
606 channel = fake_channel(token)
607 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
608 yield account_pb2_grpc.AccountStub(channel)
611@contextmanager
612def search_session(token):
613 """
614 Create a Search API for testing, uses the token for auth
615 """
616 channel = fake_channel(token)
617 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
618 yield search_pb2_grpc.SearchStub(channel)
621@contextmanager
622def references_session(token):
623 """
624 Create a References API for testing, uses the token for auth
625 """
626 channel = fake_channel(token)
627 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
628 yield references_pb2_grpc.ReferencesStub(channel)
631@contextmanager
632def reporting_session(token):
633 channel = fake_channel(token)
634 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
635 yield reporting_pb2_grpc.ReportingStub(channel)
638@contextmanager
639def events_session(token):
640 channel = fake_channel(token)
641 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
642 yield events_pb2_grpc.EventsStub(channel)
645@contextmanager
646def bugs_session(token=None):
647 if token:
648 channel = fake_channel(token)
649 else:
650 channel = FakeChannel()
651 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
652 yield bugs_pb2_grpc.BugsStub(channel)
655@contextmanager
656def resources_session():
657 channel = FakeChannel()
658 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
659 yield resources_pb2_grpc.ResourcesStub(channel)
662@contextmanager
663def media_session(bearer_token):
664 """
665 Create a fresh Media API for testing, uses the bearer token for media auth
666 """
667 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
669 with futures.ThreadPoolExecutor(1) as executor:
670 server = grpc.server(executor, interceptors=[media_auth_interceptor])
671 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
672 servicer = Media()
673 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
674 server.start()
676 call_creds = grpc.access_token_call_credentials(bearer_token)
677 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
679 try:
680 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
681 yield media_pb2_grpc.MediaStub(channel)
682 finally:
683 server.stop(None).wait()
686@pytest.fixture()
687def testconfig():
688 prevconfig = config.copy()
689 config.clear()
690 config.update(prevconfig)
692 config["IN_TEST"] = True
694 config["DEV"] = True
695 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
696 config["VERSION"] = "testing_version"
697 config["BASE_URL"] = "http://localhost:3000"
698 config["COOKIE_DOMAIN"] = "localhost"
700 config["ENABLE_SMS"] = False
701 config["SMS_SENDER_ID"] = "invalid"
703 config["ENABLE_EMAIL"] = False
704 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
705 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
706 config["NOTIFICATION_EMAIL_PREFIX"] = "[TEST] "
707 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
708 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
710 config["ENABLE_DONATIONS"] = False
711 config["STRIPE_API_KEY"] = ""
712 config["STRIPE_WEBHOOK_SECRET"] = ""
713 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
715 config["SMTP_HOST"] = "localhost"
716 config["SMTP_PORT"] = 587
717 config["SMTP_USERNAME"] = "username"
718 config["SMTP_PASSWORD"] = "password"
720 config["ENABLE_MEDIA"] = True
721 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
722 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
723 )
724 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
725 config["MEDIA_SERVER_BASE_URL"] = "http://127.0.0.1:5000"
727 config["BUG_TOOL_ENABLED"] = False
728 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
729 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
730 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
732 config["MAILCHIMP_ENABLED"] = False
733 config["MAILCHIMP_API_KEY"] = "f..."
734 config["MAILCHIMP_DC"] = "us10"
735 config["MAILCHIMP_LIST_ID"] = "b..."
737 yield None
739 config.clear()
740 config.update(prevconfig)
743@pytest.fixture
744def fast_passwords():
745 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
746 # make this fast by removing the hashing step
748 def fast_hash(password: bytes) -> bytes:
749 return b"fake hash:" + password
751 def fast_verify(hashed: bytes, password: bytes) -> bool:
752 return hashed == fast_hash(password)
754 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
755 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
756 yield