Coverage for src/tests/test_fixtures.py: 98%
507 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import AuthValidatorInterceptor, SessionInterceptor, _try_get_and_update_user_details
20from couchers.jobs.worker import process_job
21from couchers.models import (
22 Base,
23 FriendRelationship,
24 FriendStatus,
25 HostingStatus,
26 Language,
27 LanguageAbility,
28 LanguageFluency,
29 MeetupStatus,
30 Region,
31 RegionLived,
32 RegionVisited,
33 Upload,
34 User,
35 UserBlock,
36 UserSession,
37)
38from couchers.servicers.account import Account, Iris
39from couchers.servicers.admin import Admin
40from couchers.servicers.api import API
41from couchers.servicers.auth import Auth, create_session
42from couchers.servicers.blocking import Blocking
43from couchers.servicers.bugs import Bugs
44from couchers.servicers.communities import Communities
45from couchers.servicers.conversations import Conversations
46from couchers.servicers.discussions import Discussions
47from couchers.servicers.donations import Donations, Stripe
48from couchers.servicers.events import Events
49from couchers.servicers.gis import GIS
50from couchers.servicers.groups import Groups
51from couchers.servicers.jail import Jail
52from couchers.servicers.media import Media, get_media_auth_interceptor
53from couchers.servicers.notifications import Notifications
54from couchers.servicers.pages import Pages
55from couchers.servicers.references import References
56from couchers.servicers.reporting import Reporting
57from couchers.servicers.requests import Requests
58from couchers.servicers.resources import Resources
59from couchers.servicers.search import Search
60from couchers.servicers.threads import Threads
61from couchers.sql import couchers_select as select
62from couchers.utils import create_coordinate, now
63from proto import (
64 account_pb2_grpc,
65 admin_pb2_grpc,
66 annotations_pb2,
67 api_pb2_grpc,
68 auth_pb2_grpc,
69 blocking_pb2_grpc,
70 bugs_pb2_grpc,
71 communities_pb2_grpc,
72 conversations_pb2_grpc,
73 discussions_pb2_grpc,
74 donations_pb2_grpc,
75 events_pb2_grpc,
76 gis_pb2_grpc,
77 groups_pb2_grpc,
78 iris_pb2_grpc,
79 jail_pb2_grpc,
80 media_pb2_grpc,
81 notifications_pb2_grpc,
82 pages_pb2_grpc,
83 references_pb2_grpc,
84 reporting_pb2_grpc,
85 requests_pb2_grpc,
86 resources_pb2_grpc,
87 search_pb2_grpc,
88 stripe_pb2_grpc,
89 threads_pb2_grpc,
90)
93def drop_all():
94 """drop everything currently in the database"""
95 with session_scope() as session:
96 # postgis is required for all the Geographic Information System (GIS) stuff
97 # pg_trgm is required for trigram based search
98 # btree_gist is required for gist-based exclusion constraints
99 session.execute(
100 text(
101 "DROP SCHEMA IF EXISTS public CASCADE;"
102 "DROP SCHEMA IF EXISTS logging CASCADE;"
103 "DROP EXTENSION IF EXISTS postgis CASCADE;"
104 "CREATE SCHEMA public;"
105 "CREATE SCHEMA logging;"
106 "CREATE EXTENSION postgis;"
107 "CREATE EXTENSION pg_trgm;"
108 "CREATE EXTENSION btree_gist;"
109 )
110 )
112 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
113 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
114 # and similar errors
115 _get_base_engine().dispose()
117 close_all_sessions()
120def create_schema_from_models():
121 """
122 Create everything from the current models, not incrementally
123 through migrations.
124 """
126 # create the slugify function
127 functions = Path(__file__).parent / "slugify.sql"
128 with open(functions) as f, session_scope() as session:
129 session.execute(text(f.read()))
131 Base.metadata.create_all(_get_base_engine())
134def populate_testing_resources(session):
135 """
136 Testing version of couchers.resources.copy_resources_to_database
137 """
138 regions = [
139 ("AUS", "Australia"),
140 ("CAN", "Canada"),
141 ("CHE", "Switzerland"),
142 ("CUB", "Cuba"),
143 ("CXR", "Christmas Island"),
144 ("CZE", "Czechia"),
145 ("DEU", "Germany"),
146 ("EGY", "Egypt"),
147 ("ESP", "Spain"),
148 ("EST", "Estonia"),
149 ("FIN", "Finland"),
150 ("FRA", "France"),
151 ("GBR", "United Kingdom"),
152 ("GEO", "Georgia"),
153 ("GHA", "Ghana"),
154 ("GRC", "Greece"),
155 ("HKG", "Hong Kong"),
156 ("IRL", "Ireland"),
157 ("ISR", "Israel"),
158 ("ITA", "Italy"),
159 ("JPN", "Japan"),
160 ("LAO", "Laos"),
161 ("MEX", "Mexico"),
162 ("MMR", "Myanmar"),
163 ("NAM", "Namibia"),
164 ("NLD", "Netherlands"),
165 ("NZL", "New Zealand"),
166 ("POL", "Poland"),
167 ("PRK", "North Korea"),
168 ("REU", "Réunion"),
169 ("SGP", "Singapore"),
170 ("SWE", "Sweden"),
171 ("THA", "Thailand"),
172 ("TUR", "Turkey"),
173 ("TWN", "Taiwan"),
174 ("USA", "United States"),
175 ("VNM", "Vietnam"),
176 ]
178 languages = [
179 ("arb", "Arabic (Standard)"),
180 ("deu", "German"),
181 ("eng", "English"),
182 ("fin", "Finnish"),
183 ("fra", "French"),
184 ("heb", "Hebrew"),
185 ("hun", "Hungarian"),
186 ("jpn", "Japanese"),
187 ("pol", "Polish"),
188 ("swe", "Swedish"),
189 ("cmn", "Chinese (Mandarin)"),
190 ]
192 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
193 tz_sql = f.read()
195 for code, name in regions:
196 session.add(Region(code=code, name=name))
198 for code, name in languages:
199 session.add(Language(code=code, name=name))
201 session.execute(text(tz_sql))
204def recreate_database():
205 """
206 Connect to a running Postgres database, build it using metadata.create_all()
207 """
209 # running in non-UTC catches some timezone errors
210 os.environ["TZ"] = "America/New_York"
212 # drop everything currently in the database
213 drop_all()
215 # create everything from the current models, not incrementally through migrations
216 create_schema_from_models()
218 with session_scope() as session:
219 populate_testing_resources(session)
222@pytest.fixture()
223def db():
224 """
225 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
226 """
228 recreate_database()
231def generate_user(*, delete_user=False, complete_profile=True, **kwargs):
232 """
233 Create a new user, return session token
235 The user is detached from any session, and you can access its static attributes, but you can't modify it
237 Use this most of the time
238 """
239 auth = Auth()
241 with session_scope() as session:
242 # default args
243 username = "test_user_" + random_hex(16)
244 user_opts = {
245 "username": username,
246 "email": f"{username}@dev.couchers.org",
247 # password is just 'password'
248 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
249 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
250 "name": username.capitalize(),
251 "hosting_status": HostingStatus.cant_host,
252 "meetup_status": MeetupStatus.open_to_meetup,
253 "city": "Testing city",
254 "hometown": "Test hometown",
255 "community_standing": 0.5,
256 "birthdate": date(year=2000, month=1, day=1),
257 "gender": "N/A",
258 "pronouns": "",
259 "occupation": "Tester",
260 "education": "UST(esting)",
261 "about_me": "I test things",
262 "things_i_like": "Code",
263 "about_place": "My place has a lot of testing paraphenelia",
264 "additional_information": "I can be a bit testy",
265 # you need to make sure to update this logic to make sure the user is jailed/not on request
266 "accepted_tos": TOS_VERSION,
267 "accepted_community_guidelines": GUIDELINES_VERSION,
268 "geom": create_coordinate(40.7108, -73.9740),
269 "geom_radius": 100,
270 "onboarding_emails_sent": 1,
271 "last_onboarding_email_sent": now(),
272 "has_donated": True,
273 }
275 for key, value in kwargs.items():
276 user_opts[key] = value
278 user = User(**user_opts)
279 session.add(user)
280 session.flush()
282 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
283 session.add(RegionVisited(user_id=user.id, region_code="REU"))
284 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
286 session.add(RegionLived(user_id=user.id, region_code="ESP"))
287 session.add(RegionLived(user_id=user.id, region_code="FRA"))
288 session.add(RegionLived(user_id=user.id, region_code="EST"))
290 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
291 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
293 # this expires the user, so now it's "dirty"
294 session.commit()
296 class _DummyContext:
297 def invocation_metadata(self):
298 return {}
300 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
302 # deleted user aborts session creation, hence this follows and necessitates a second commit
303 if delete_user:
304 user.is_deleted = True
306 user.recommendation_score = 1e10 - user.id
308 if complete_profile:
309 key = random_hex(32)
310 filename = random_hex(32) + ".jpg"
311 session.add(
312 Upload(
313 key=key,
314 filename=filename,
315 creator_user_id=user.id,
316 )
317 )
318 session.flush()
319 user.avatar_key = key
320 user.about_me = "I have a complete profile!\n" * 20
322 session.commit()
324 assert user.has_completed_profile == complete_profile
326 # refresh it, undoes the expiry
327 session.refresh(user)
329 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
330 user.timezone # noqa: B018
332 # allows detaches the user from the session, allowing its use outside this session
333 session.expunge(user)
335 return user, token
338def get_user_id_and_token(session, username):
339 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
340 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
341 return user_id, token
344def make_friends(user1, user2):
345 with session_scope() as session:
346 friend_relationship = FriendRelationship(
347 from_user_id=user1.id,
348 to_user_id=user2.id,
349 status=FriendStatus.accepted,
350 )
351 session.add(friend_relationship)
354def make_user_block(user1, user2):
355 with session_scope() as session:
356 user_block = UserBlock(
357 blocking_user_id=user1.id,
358 blocked_user_id=user2.id,
359 )
360 session.add(user_block)
361 session.commit()
364def make_user_invisible(user_id):
365 with session_scope() as session:
366 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
369# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
370def get_friend_relationship(user1, user2):
371 with session_scope() as session:
372 friend_relationship = session.execute(
373 select(FriendRelationship).where(
374 or_(
375 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
376 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
377 )
378 )
379 ).scalar_one_or_none()
381 session.expunge(friend_relationship)
382 return friend_relationship
385class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
386 """
387 Injects the right `cookie: couchers-sesh=...` header into the metadata
388 """
390 def __init__(self, token):
391 self.token = token
393 def __call__(self, context, callback):
394 callback((("cookie", f"couchers-sesh={self.token}"),), None)
397@contextmanager
398def auth_api_session(grpc_channel_options=()):
399 """
400 Create an Auth API for testing
402 This needs to use the real server since it plays around with headers
403 """
404 with futures.ThreadPoolExecutor(1) as executor:
405 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
406 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
407 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
408 server.start()
410 try:
411 with grpc.secure_channel(
412 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
413 ) as channel:
415 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
416 def __init__(self):
417 self.latest_headers = {}
419 def intercept_unary_unary(self, continuation, client_call_details, request):
420 call = continuation(client_call_details, request)
421 self.latest_headers = dict(call.initial_metadata())
422 self.latest_header_raw = call.initial_metadata()
423 return call
425 metadata_interceptor = _MetadataKeeperInterceptor()
426 channel = grpc.intercept_channel(channel, metadata_interceptor)
427 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
428 finally:
429 server.stop(None).wait()
432@contextmanager
433def api_session(token):
434 """
435 Create an API for testing, uses the token for auth
436 """
437 channel = fake_channel(token)
438 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
439 yield api_pb2_grpc.APIStub(channel)
442@contextmanager
443def real_api_session(token):
444 """
445 Create an API for testing, using TCP sockets, uses the token for auth
446 """
447 with futures.ThreadPoolExecutor(1) as executor:
448 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
449 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
450 api_pb2_grpc.add_APIServicer_to_server(API(), server)
451 server.start()
453 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
454 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
456 try:
457 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
458 yield api_pb2_grpc.APIStub(channel)
459 finally:
460 server.stop(None).wait()
463@contextmanager
464def real_admin_session(token):
465 """
466 Create a Admin service for testing, using TCP sockets, uses the token for auth
467 """
468 with futures.ThreadPoolExecutor(1) as executor:
469 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
470 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
471 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
472 server.start()
474 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
475 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
477 try:
478 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
479 yield admin_pb2_grpc.AdminStub(channel)
480 finally:
481 server.stop(None).wait()
484@contextmanager
485def real_account_session(token):
486 """
487 Create a Account service for testing, using TCP sockets, uses the token for auth
488 """
489 with futures.ThreadPoolExecutor(1) as executor:
490 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
491 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
492 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
493 server.start()
495 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
496 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
498 try:
499 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
500 yield account_pb2_grpc.AccountStub(channel)
501 finally:
502 server.stop(None).wait()
505@contextmanager
506def real_jail_session(token):
507 """
508 Create a Jail service for testing, using TCP sockets, uses the token for auth
509 """
510 with futures.ThreadPoolExecutor(1) as executor:
511 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
512 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
513 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
514 server.start()
516 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
517 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
519 try:
520 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
521 yield jail_pb2_grpc.JailStub(channel)
522 finally:
523 server.stop(None).wait()
526@contextmanager
527def gis_session(token):
528 channel = fake_channel(token)
529 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
530 yield gis_pb2_grpc.GISStub(channel)
533class FakeRpcError(grpc.RpcError):
534 def __init__(self, code, details):
535 self._code = code
536 self._details = details
538 def code(self):
539 return self._code
541 def details(self):
542 return self._details
545def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
546 # method is of the form "/org.couchers.api.core.API/GetUser"
547 _, service_name, method_name = method.split("/")
549 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
550 auth_level = service_options.Extensions[annotations_pb2.auth_level]
551 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
552 assert auth_level in [
553 annotations_pb2.AUTH_LEVEL_OPEN,
554 annotations_pb2.AUTH_LEVEL_JAILED,
555 annotations_pb2.AUTH_LEVEL_SECURE,
556 annotations_pb2.AUTH_LEVEL_ADMIN,
557 ]
559 if not user_id:
560 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
561 else:
562 assert not (
563 auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser
564 ), "Non-superuser tried to call superuser API"
565 assert not (
566 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
567 ), "User is jailed but tried to call non-open/non-jailed API"
570class FakeChannel:
571 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
572 self.handlers = {}
573 self.user_id = user_id
574 self._is_jailed = is_jailed
575 self._is_superuser = is_superuser
576 self._token_expiry = token_expiry
578 def abort(self, code, details):
579 raise FakeRpcError(code, details)
581 def add_generic_rpc_handlers(self, generic_rpc_handlers):
582 from grpc._server import _validate_generic_rpc_handlers
584 _validate_generic_rpc_handlers(generic_rpc_handlers)
586 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
588 def unary_unary(self, uri, request_serializer, response_deserializer):
589 handler = self.handlers[uri]
591 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
593 def fake_handler(request):
594 # Do a full serialization cycle on the request and the
595 # response to catch accidental use of unserializable data.
596 request = handler.request_deserializer(request_serializer(request))
598 with session_scope() as session:
599 response = handler.unary_unary(request, self, session)
601 return response_deserializer(handler.response_serializer(response))
603 return fake_handler
606def fake_channel(token=None):
607 if token:
608 user_id, is_jailed, is_superuser, token_expiry = _try_get_and_update_user_details(
609 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
610 )
611 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
612 return FakeChannel()
615@contextmanager
616def conversations_session(token):
617 """
618 Create a Conversations API for testing, uses the token for auth
619 """
620 channel = fake_channel(token)
621 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
622 yield conversations_pb2_grpc.ConversationsStub(channel)
625@contextmanager
626def requests_session(token):
627 """
628 Create a Requests API for testing, uses the token for auth
629 """
630 channel = fake_channel(token)
631 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
632 yield requests_pb2_grpc.RequestsStub(channel)
635@contextmanager
636def threads_session(token):
637 channel = fake_channel(token)
638 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
639 yield threads_pb2_grpc.ThreadsStub(channel)
642@contextmanager
643def discussions_session(token):
644 channel = fake_channel(token)
645 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
646 yield discussions_pb2_grpc.DiscussionsStub(channel)
649@contextmanager
650def donations_session(token):
651 channel = fake_channel(token)
652 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
653 yield donations_pb2_grpc.DonationsStub(channel)
656@contextmanager
657def real_stripe_session():
658 """
659 Create a Stripe service for testing, using TCP sockets
660 """
661 with futures.ThreadPoolExecutor(1) as executor:
662 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
663 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
664 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
665 server.start()
667 creds = grpc.local_channel_credentials()
669 try:
670 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
671 yield stripe_pb2_grpc.StripeStub(channel)
672 finally:
673 server.stop(None).wait()
676@contextmanager
677def real_iris_session():
678 with futures.ThreadPoolExecutor(1) as executor:
679 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
680 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
681 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
682 server.start()
684 creds = grpc.local_channel_credentials()
686 try:
687 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
688 yield iris_pb2_grpc.IrisStub(channel)
689 finally:
690 server.stop(None).wait()
693@contextmanager
694def pages_session(token):
695 channel = fake_channel(token)
696 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
697 yield pages_pb2_grpc.PagesStub(channel)
700@contextmanager
701def communities_session(token):
702 channel = fake_channel(token)
703 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
704 yield communities_pb2_grpc.CommunitiesStub(channel)
707@contextmanager
708def groups_session(token):
709 channel = fake_channel(token)
710 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
711 yield groups_pb2_grpc.GroupsStub(channel)
714@contextmanager
715def blocking_session(token):
716 channel = fake_channel(token)
717 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
718 yield blocking_pb2_grpc.BlockingStub(channel)
721@contextmanager
722def notifications_session(token):
723 channel = fake_channel(token)
724 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
725 yield notifications_pb2_grpc.NotificationsStub(channel)
728@contextmanager
729def account_session(token):
730 """
731 Create a Account API for testing, uses the token for auth
732 """
733 channel = fake_channel(token)
734 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
735 yield account_pb2_grpc.AccountStub(channel)
738@contextmanager
739def search_session(token):
740 """
741 Create a Search API for testing, uses the token for auth
742 """
743 channel = fake_channel(token)
744 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
745 yield search_pb2_grpc.SearchStub(channel)
748@contextmanager
749def references_session(token):
750 """
751 Create a References API for testing, uses the token for auth
752 """
753 channel = fake_channel(token)
754 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
755 yield references_pb2_grpc.ReferencesStub(channel)
758@contextmanager
759def reporting_session(token):
760 channel = fake_channel(token)
761 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
762 yield reporting_pb2_grpc.ReportingStub(channel)
765@contextmanager
766def events_session(token):
767 channel = fake_channel(token)
768 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
769 yield events_pb2_grpc.EventsStub(channel)
772@contextmanager
773def bugs_session(token=None):
774 channel = fake_channel(token)
775 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
776 yield bugs_pb2_grpc.BugsStub(channel)
779@contextmanager
780def resources_session():
781 channel = fake_channel()
782 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
783 yield resources_pb2_grpc.ResourcesStub(channel)
786@contextmanager
787def media_session(bearer_token):
788 """
789 Create a fresh Media API for testing, uses the bearer token for media auth
790 """
791 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
793 with futures.ThreadPoolExecutor(1) as executor:
794 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
795 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
796 servicer = Media()
797 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
798 server.start()
800 call_creds = grpc.access_token_call_credentials(bearer_token)
801 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
803 try:
804 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
805 yield media_pb2_grpc.MediaStub(channel)
806 finally:
807 server.stop(None).wait()
810@pytest.fixture(scope="class")
811def testconfig():
812 prevconfig = config.copy()
813 config.clear()
814 config.update(prevconfig)
816 config["IN_TEST"] = True
818 config["DEV"] = True
819 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
820 config["VERSION"] = "testing_version"
821 config["BASE_URL"] = "http://localhost:3000"
822 config["BACKEND_BASE_URL"] = "http://localhost:8888"
823 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
824 config["COOKIE_DOMAIN"] = "localhost"
826 config["ENABLE_SMS"] = False
827 config["SMS_SENDER_ID"] = "invalid"
829 config["ENABLE_EMAIL"] = False
830 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
831 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
832 config["NOTIFICATION_PREFIX"] = "[TEST] "
833 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
834 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
835 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
837 config["ENABLE_DONATIONS"] = False
838 config["STRIPE_API_KEY"] = ""
839 config["STRIPE_WEBHOOK_SECRET"] = ""
840 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
842 config["ENABLE_STRONG_VERIFICATION"] = False
843 config["IRIS_ID_PUBKEY"] = ""
844 config["IRIS_ID_SECRET"] = ""
845 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
846 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
847 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
848 )
850 config["SMTP_HOST"] = "localhost"
851 config["SMTP_PORT"] = 587
852 config["SMTP_USERNAME"] = "username"
853 config["SMTP_PASSWORD"] = "password"
855 config["ENABLE_MEDIA"] = True
856 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
857 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
858 )
859 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
860 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
861 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
863 config["BUG_TOOL_ENABLED"] = False
864 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
865 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
866 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
868 config["LISTMONK_ENABLED"] = False
869 config["LISTMONK_BASE_URL"] = "https://localhost"
870 config["LISTMONK_API_KEY"] = "..."
871 config["LISTMONK_LIST_UUID"] = "..."
873 config["PUSH_NOTIFICATIONS_ENABLED"] = True
874 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
875 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
877 yield None
879 config.clear()
880 config.update(prevconfig)
883@pytest.fixture
884def fast_passwords():
885 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
886 # make this fast by removing the hashing step
888 def fast_hash(password: bytes) -> bytes:
889 return b"fake hash:" + password
891 def fast_verify(hashed: bytes, password: bytes) -> bool:
892 return hashed == fast_hash(password)
894 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
895 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
896 yield
899def process_jobs():
900 while process_job():
901 pass
904@contextmanager
905def mock_notification_email():
906 with patch("couchers.email._queue_email") as mock:
907 yield mock
908 process_jobs()
911@dataclass
912class EmailData:
913 sender_name: str
914 sender_email: str
915 recipient: str
916 subject: str
917 plain: str
918 html: str
919 source_data: str
920 list_unsubscribe_header: str
923def email_fields(mock, call_ix=0):
924 _, kw = mock.call_args_list[call_ix]
925 return EmailData(
926 sender_name=kw.get("sender_name"),
927 sender_email=kw.get("sender_email"),
928 recipient=kw.get("recipient"),
929 subject=kw.get("subject"),
930 plain=kw.get("plain"),
931 html=kw.get("html"),
932 source_data=kw.get("source_data"),
933 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
934 )
937@pytest.fixture
938def push_collector():
939 """
940 See test_SendTestPushNotification for an example on how to use this fixture
941 """
943 class Push:
944 """
945 This allows nice access to the push info via e.g. push.title instead of push["title"]
946 """
948 def __init__(self, kwargs):
949 self.kwargs = kwargs
951 def __getattr__(self, attr):
952 try:
953 return self.kwargs[attr]
954 except KeyError:
955 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
957 def __repr__(self):
958 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
959 return f"Push({kwargs_disp})"
961 class PushCollector:
962 def __init__(self):
963 # pairs of (user_id, push)
964 self.pushes = []
966 def by_user(self, user_id):
967 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
969 def push_to_user(self, session, user_id, **kwargs):
970 self.pushes.append((user_id, Push(kwargs=kwargs)))
972 def assert_user_has_count(self, user_id, count):
973 assert len(self.by_user(user_id)) == count
975 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
976 push = self.by_user(user_id)[ix]
977 for kwarg in kwargs:
978 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
979 assert (
980 push.kwargs[kwarg] == kwargs[kwarg]
981 ), f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
983 def assert_user_has_single_matching(self, user_id, **kwargs):
984 self.assert_user_has_count(user_id, 1)
985 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
987 collector = PushCollector()
989 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
990 yield collector