Coverage for src/tests/test_fixtures.py: 98%
507 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import AuthValidatorInterceptor, SessionInterceptor, _try_get_and_update_user_details
20from couchers.jobs.worker import process_job
21from couchers.models import (
22 Base,
23 FriendRelationship,
24 FriendStatus,
25 HostingStatus,
26 Language,
27 LanguageAbility,
28 LanguageFluency,
29 MeetupStatus,
30 Region,
31 RegionLived,
32 RegionVisited,
33 Upload,
34 User,
35 UserBlock,
36 UserSession,
37)
38from couchers.servicers.account import Account, Iris
39from couchers.servicers.admin import Admin
40from couchers.servicers.api import API
41from couchers.servicers.auth import Auth, create_session
42from couchers.servicers.blocking import Blocking
43from couchers.servicers.bugs import Bugs
44from couchers.servicers.communities import Communities
45from couchers.servicers.conversations import Conversations
46from couchers.servicers.discussions import Discussions
47from couchers.servicers.donations import Donations, Stripe
48from couchers.servicers.events import Events
49from couchers.servicers.gis import GIS
50from couchers.servicers.groups import Groups
51from couchers.servicers.jail import Jail
52from couchers.servicers.media import Media, get_media_auth_interceptor
53from couchers.servicers.notifications import Notifications
54from couchers.servicers.pages import Pages
55from couchers.servicers.references import References
56from couchers.servicers.reporting import Reporting
57from couchers.servicers.requests import Requests
58from couchers.servicers.resources import Resources
59from couchers.servicers.search import Search
60from couchers.servicers.threads import Threads
61from couchers.sql import couchers_select as select
62from couchers.utils import create_coordinate, now
63from proto import (
64 account_pb2_grpc,
65 admin_pb2_grpc,
66 annotations_pb2,
67 api_pb2_grpc,
68 auth_pb2_grpc,
69 blocking_pb2_grpc,
70 bugs_pb2_grpc,
71 communities_pb2_grpc,
72 conversations_pb2_grpc,
73 discussions_pb2_grpc,
74 donations_pb2_grpc,
75 events_pb2_grpc,
76 gis_pb2_grpc,
77 groups_pb2_grpc,
78 iris_pb2_grpc,
79 jail_pb2_grpc,
80 media_pb2_grpc,
81 notifications_pb2_grpc,
82 pages_pb2_grpc,
83 references_pb2_grpc,
84 reporting_pb2_grpc,
85 requests_pb2_grpc,
86 resources_pb2_grpc,
87 search_pb2_grpc,
88 stripe_pb2_grpc,
89 threads_pb2_grpc,
90)
93def drop_all():
94 """drop everything currently in the database"""
95 with session_scope() as session:
96 # postgis is required for all the Geographic Information System (GIS) stuff
97 # pg_trgm is required for trigram based search
98 # btree_gist is required for gist-based exclusion constraints
99 session.execute(
100 text(
101 "DROP SCHEMA IF EXISTS public CASCADE;"
102 "DROP SCHEMA IF EXISTS logging CASCADE;"
103 "DROP EXTENSION IF EXISTS postgis CASCADE;"
104 "CREATE SCHEMA public;"
105 "CREATE SCHEMA logging;"
106 "CREATE EXTENSION postgis;"
107 "CREATE EXTENSION pg_trgm;"
108 "CREATE EXTENSION btree_gist;"
109 )
110 )
112 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
113 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
114 # and similar errors
115 _get_base_engine().dispose()
117 close_all_sessions()
120def create_schema_from_models():
121 """
122 Create everything from the current models, not incrementally
123 through migrations.
124 """
126 # create the slugify function
127 functions = Path(__file__).parent / "slugify.sql"
128 with open(functions) as f, session_scope() as session:
129 session.execute(text(f.read()))
131 Base.metadata.create_all(_get_base_engine())
134def populate_testing_resources(session):
135 """
136 Testing version of couchers.resources.copy_resources_to_database
137 """
138 regions = [
139 ("AUS", "Australia"),
140 ("CAN", "Canada"),
141 ("CHE", "Switzerland"),
142 ("CUB", "Cuba"),
143 ("CXR", "Christmas Island"),
144 ("CZE", "Czechia"),
145 ("DEU", "Germany"),
146 ("EGY", "Egypt"),
147 ("ESP", "Spain"),
148 ("EST", "Estonia"),
149 ("FIN", "Finland"),
150 ("FRA", "France"),
151 ("GBR", "United Kingdom"),
152 ("GEO", "Georgia"),
153 ("GHA", "Ghana"),
154 ("GRC", "Greece"),
155 ("HKG", "Hong Kong"),
156 ("IRL", "Ireland"),
157 ("ISR", "Israel"),
158 ("ITA", "Italy"),
159 ("JPN", "Japan"),
160 ("LAO", "Laos"),
161 ("MEX", "Mexico"),
162 ("MMR", "Myanmar"),
163 ("NAM", "Namibia"),
164 ("NLD", "Netherlands"),
165 ("NZL", "New Zealand"),
166 ("POL", "Poland"),
167 ("PRK", "North Korea"),
168 ("REU", "Réunion"),
169 ("SGP", "Singapore"),
170 ("SWE", "Sweden"),
171 ("THA", "Thailand"),
172 ("TUR", "Turkey"),
173 ("TWN", "Taiwan"),
174 ("USA", "United States"),
175 ("VNM", "Vietnam"),
176 ]
178 languages = [
179 ("arb", "Arabic (Standard)"),
180 ("deu", "German"),
181 ("eng", "English"),
182 ("fin", "Finnish"),
183 ("fra", "French"),
184 ("heb", "Hebrew"),
185 ("hun", "Hungarian"),
186 ("jpn", "Japanese"),
187 ("pol", "Polish"),
188 ("swe", "Swedish"),
189 ("cmn", "Chinese (Mandarin)"),
190 ]
192 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
193 tz_sql = f.read()
195 for code, name in regions:
196 session.add(Region(code=code, name=name))
198 for code, name in languages:
199 session.add(Language(code=code, name=name))
201 session.execute(text(tz_sql))
204def recreate_database():
205 """
206 Connect to a running Postgres database, build it using metadata.create_all()
207 """
209 # running in non-UTC catches some timezone errors
210 os.environ["TZ"] = "America/New_York"
212 # drop everything currently in the database
213 drop_all()
215 # create everything from the current models, not incrementally through migrations
216 create_schema_from_models()
218 with session_scope() as session:
219 populate_testing_resources(session)
222@pytest.fixture()
223def db():
224 """
225 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
226 """
228 recreate_database()
231def generate_user(*, delete_user=False, complete_profile=True, **kwargs):
232 """
233 Create a new user, return session token
235 The user is detached from any session, and you can access its static attributes, but you can't modify it
237 Use this most of the time
238 """
239 auth = Auth()
241 with session_scope() as session:
242 # default args
243 username = "test_user_" + random_hex(16)
244 user_opts = {
245 "username": username,
246 "email": f"{username}@dev.couchers.org",
247 # password is just 'password'
248 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
249 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
250 "name": username.capitalize(),
251 "hosting_status": HostingStatus.cant_host,
252 "meetup_status": MeetupStatus.open_to_meetup,
253 "city": "Testing city",
254 "hometown": "Test hometown",
255 "community_standing": 0.5,
256 "birthdate": date(year=2000, month=1, day=1),
257 "gender": "N/A",
258 "pronouns": "",
259 "occupation": "Tester",
260 "education": "UST(esting)",
261 "about_me": "I test things",
262 "my_travels": "Places",
263 "things_i_like": "Code",
264 "about_place": "My place has a lot of testing paraphenelia",
265 "additional_information": "I can be a bit testy",
266 # you need to make sure to update this logic to make sure the user is jailed/not on request
267 "accepted_tos": TOS_VERSION,
268 "accepted_community_guidelines": GUIDELINES_VERSION,
269 "geom": create_coordinate(40.7108, -73.9740),
270 "geom_radius": 100,
271 "onboarding_emails_sent": 1,
272 "last_onboarding_email_sent": now(),
273 "has_donated": True,
274 }
276 for key, value in kwargs.items():
277 user_opts[key] = value
279 user = User(**user_opts)
280 session.add(user)
281 session.flush()
283 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
284 session.add(RegionVisited(user_id=user.id, region_code="REU"))
285 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
287 session.add(RegionLived(user_id=user.id, region_code="ESP"))
288 session.add(RegionLived(user_id=user.id, region_code="FRA"))
289 session.add(RegionLived(user_id=user.id, region_code="EST"))
291 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
292 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
294 # this expires the user, so now it's "dirty"
295 session.commit()
297 class _DummyContext:
298 def invocation_metadata(self):
299 return {}
301 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
303 # deleted user aborts session creation, hence this follows and necessitates a second commit
304 if delete_user:
305 user.is_deleted = True
307 user.recommendation_score = 1e10 - user.id
309 if complete_profile:
310 key = random_hex(32)
311 filename = random_hex(32) + ".jpg"
312 session.add(
313 Upload(
314 key=key,
315 filename=filename,
316 creator_user_id=user.id,
317 )
318 )
319 session.flush()
320 user.avatar_key = key
321 user.about_me = "I have a complete profile!\n" * 20
323 session.commit()
325 assert user.has_completed_profile == complete_profile
327 # refresh it, undoes the expiry
328 session.refresh(user)
330 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
331 user.timezone # noqa: B018
333 # allows detaches the user from the session, allowing its use outside this session
334 session.expunge(user)
336 return user, token
339def get_user_id_and_token(session, username):
340 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
341 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
342 return user_id, token
345def make_friends(user1, user2):
346 with session_scope() as session:
347 friend_relationship = FriendRelationship(
348 from_user_id=user1.id,
349 to_user_id=user2.id,
350 status=FriendStatus.accepted,
351 )
352 session.add(friend_relationship)
355def make_user_block(user1, user2):
356 with session_scope() as session:
357 user_block = UserBlock(
358 blocking_user_id=user1.id,
359 blocked_user_id=user2.id,
360 )
361 session.add(user_block)
362 session.commit()
365def make_user_invisible(user_id):
366 with session_scope() as session:
367 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
370# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
371def get_friend_relationship(user1, user2):
372 with session_scope() as session:
373 friend_relationship = session.execute(
374 select(FriendRelationship).where(
375 or_(
376 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
377 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
378 )
379 )
380 ).scalar_one_or_none()
382 session.expunge(friend_relationship)
383 return friend_relationship
386class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
387 """
388 Injects the right `cookie: couchers-sesh=...` header into the metadata
389 """
391 def __init__(self, token):
392 self.token = token
394 def __call__(self, context, callback):
395 callback((("cookie", f"couchers-sesh={self.token}"),), None)
398@contextmanager
399def auth_api_session(grpc_channel_options=()):
400 """
401 Create an Auth API for testing
403 This needs to use the real server since it plays around with headers
404 """
405 with futures.ThreadPoolExecutor(1) as executor:
406 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
407 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
408 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
409 server.start()
411 try:
412 with grpc.secure_channel(
413 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
414 ) as channel:
416 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
417 def __init__(self):
418 self.latest_headers = {}
420 def intercept_unary_unary(self, continuation, client_call_details, request):
421 call = continuation(client_call_details, request)
422 self.latest_headers = dict(call.initial_metadata())
423 self.latest_header_raw = call.initial_metadata()
424 return call
426 metadata_interceptor = _MetadataKeeperInterceptor()
427 channel = grpc.intercept_channel(channel, metadata_interceptor)
428 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
429 finally:
430 server.stop(None).wait()
433@contextmanager
434def api_session(token):
435 """
436 Create an API for testing, uses the token for auth
437 """
438 channel = fake_channel(token)
439 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
440 yield api_pb2_grpc.APIStub(channel)
443@contextmanager
444def real_api_session(token):
445 """
446 Create an API for testing, using TCP sockets, uses the token for auth
447 """
448 with futures.ThreadPoolExecutor(1) as executor:
449 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
450 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
451 api_pb2_grpc.add_APIServicer_to_server(API(), server)
452 server.start()
454 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
455 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
457 try:
458 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
459 yield api_pb2_grpc.APIStub(channel)
460 finally:
461 server.stop(None).wait()
464@contextmanager
465def real_admin_session(token):
466 """
467 Create a Admin service for testing, using TCP sockets, uses the token for auth
468 """
469 with futures.ThreadPoolExecutor(1) as executor:
470 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
471 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
472 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
473 server.start()
475 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
476 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
478 try:
479 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
480 yield admin_pb2_grpc.AdminStub(channel)
481 finally:
482 server.stop(None).wait()
485@contextmanager
486def real_account_session(token):
487 """
488 Create a Account service for testing, using TCP sockets, uses the token for auth
489 """
490 with futures.ThreadPoolExecutor(1) as executor:
491 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
492 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
493 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
494 server.start()
496 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
497 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
499 try:
500 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
501 yield account_pb2_grpc.AccountStub(channel)
502 finally:
503 server.stop(None).wait()
506@contextmanager
507def real_jail_session(token):
508 """
509 Create a Jail service for testing, using TCP sockets, uses the token for auth
510 """
511 with futures.ThreadPoolExecutor(1) as executor:
512 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
513 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
514 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
515 server.start()
517 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
518 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
520 try:
521 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
522 yield jail_pb2_grpc.JailStub(channel)
523 finally:
524 server.stop(None).wait()
527@contextmanager
528def gis_session(token):
529 channel = fake_channel(token)
530 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
531 yield gis_pb2_grpc.GISStub(channel)
534class FakeRpcError(grpc.RpcError):
535 def __init__(self, code, details):
536 self._code = code
537 self._details = details
539 def code(self):
540 return self._code
542 def details(self):
543 return self._details
546def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
547 # method is of the form "/org.couchers.api.core.API/GetUser"
548 _, service_name, method_name = method.split("/")
550 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
551 auth_level = service_options.Extensions[annotations_pb2.auth_level]
552 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
553 assert auth_level in [
554 annotations_pb2.AUTH_LEVEL_OPEN,
555 annotations_pb2.AUTH_LEVEL_JAILED,
556 annotations_pb2.AUTH_LEVEL_SECURE,
557 annotations_pb2.AUTH_LEVEL_ADMIN,
558 ]
560 if not user_id:
561 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
562 else:
563 assert not (
564 auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser
565 ), "Non-superuser tried to call superuser API"
566 assert not (
567 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
568 ), "User is jailed but tried to call non-open/non-jailed API"
571class FakeChannel:
572 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
573 self.handlers = {}
574 self.user_id = user_id
575 self._is_jailed = is_jailed
576 self._is_superuser = is_superuser
577 self._token_expiry = token_expiry
579 def abort(self, code, details):
580 raise FakeRpcError(code, details)
582 def add_generic_rpc_handlers(self, generic_rpc_handlers):
583 from grpc._server import _validate_generic_rpc_handlers
585 _validate_generic_rpc_handlers(generic_rpc_handlers)
587 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
589 def unary_unary(self, uri, request_serializer, response_deserializer):
590 handler = self.handlers[uri]
592 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
594 def fake_handler(request):
595 # Do a full serialization cycle on the request and the
596 # response to catch accidental use of unserializable data.
597 request = handler.request_deserializer(request_serializer(request))
599 with session_scope() as session:
600 response = handler.unary_unary(request, self, session)
602 return response_deserializer(handler.response_serializer(response))
604 return fake_handler
607def fake_channel(token=None):
608 if token:
609 user_id, is_jailed, is_superuser, token_expiry = _try_get_and_update_user_details(
610 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
611 )
612 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
613 return FakeChannel()
616@contextmanager
617def conversations_session(token):
618 """
619 Create a Conversations API for testing, uses the token for auth
620 """
621 channel = fake_channel(token)
622 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
623 yield conversations_pb2_grpc.ConversationsStub(channel)
626@contextmanager
627def requests_session(token):
628 """
629 Create a Requests API for testing, uses the token for auth
630 """
631 channel = fake_channel(token)
632 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
633 yield requests_pb2_grpc.RequestsStub(channel)
636@contextmanager
637def threads_session(token):
638 channel = fake_channel(token)
639 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
640 yield threads_pb2_grpc.ThreadsStub(channel)
643@contextmanager
644def discussions_session(token):
645 channel = fake_channel(token)
646 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
647 yield discussions_pb2_grpc.DiscussionsStub(channel)
650@contextmanager
651def donations_session(token):
652 channel = fake_channel(token)
653 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
654 yield donations_pb2_grpc.DonationsStub(channel)
657@contextmanager
658def real_stripe_session():
659 """
660 Create a Stripe service for testing, using TCP sockets
661 """
662 with futures.ThreadPoolExecutor(1) as executor:
663 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
664 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
665 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
666 server.start()
668 creds = grpc.local_channel_credentials()
670 try:
671 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
672 yield stripe_pb2_grpc.StripeStub(channel)
673 finally:
674 server.stop(None).wait()
677@contextmanager
678def real_iris_session():
679 with futures.ThreadPoolExecutor(1) as executor:
680 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
681 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
682 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
683 server.start()
685 creds = grpc.local_channel_credentials()
687 try:
688 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
689 yield iris_pb2_grpc.IrisStub(channel)
690 finally:
691 server.stop(None).wait()
694@contextmanager
695def pages_session(token):
696 channel = fake_channel(token)
697 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
698 yield pages_pb2_grpc.PagesStub(channel)
701@contextmanager
702def communities_session(token):
703 channel = fake_channel(token)
704 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
705 yield communities_pb2_grpc.CommunitiesStub(channel)
708@contextmanager
709def groups_session(token):
710 channel = fake_channel(token)
711 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
712 yield groups_pb2_grpc.GroupsStub(channel)
715@contextmanager
716def blocking_session(token):
717 channel = fake_channel(token)
718 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
719 yield blocking_pb2_grpc.BlockingStub(channel)
722@contextmanager
723def notifications_session(token):
724 channel = fake_channel(token)
725 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
726 yield notifications_pb2_grpc.NotificationsStub(channel)
729@contextmanager
730def account_session(token):
731 """
732 Create a Account API for testing, uses the token for auth
733 """
734 channel = fake_channel(token)
735 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
736 yield account_pb2_grpc.AccountStub(channel)
739@contextmanager
740def search_session(token):
741 """
742 Create a Search API for testing, uses the token for auth
743 """
744 channel = fake_channel(token)
745 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
746 yield search_pb2_grpc.SearchStub(channel)
749@contextmanager
750def references_session(token):
751 """
752 Create a References API for testing, uses the token for auth
753 """
754 channel = fake_channel(token)
755 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
756 yield references_pb2_grpc.ReferencesStub(channel)
759@contextmanager
760def reporting_session(token):
761 channel = fake_channel(token)
762 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
763 yield reporting_pb2_grpc.ReportingStub(channel)
766@contextmanager
767def events_session(token):
768 channel = fake_channel(token)
769 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
770 yield events_pb2_grpc.EventsStub(channel)
773@contextmanager
774def bugs_session(token=None):
775 channel = fake_channel(token)
776 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
777 yield bugs_pb2_grpc.BugsStub(channel)
780@contextmanager
781def resources_session():
782 channel = fake_channel()
783 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
784 yield resources_pb2_grpc.ResourcesStub(channel)
787@contextmanager
788def media_session(bearer_token):
789 """
790 Create a fresh Media API for testing, uses the bearer token for media auth
791 """
792 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
794 with futures.ThreadPoolExecutor(1) as executor:
795 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
796 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
797 servicer = Media()
798 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
799 server.start()
801 call_creds = grpc.access_token_call_credentials(bearer_token)
802 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
804 try:
805 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
806 yield media_pb2_grpc.MediaStub(channel)
807 finally:
808 server.stop(None).wait()
811@pytest.fixture(scope="class")
812def testconfig():
813 prevconfig = config.copy()
814 config.clear()
815 config.update(prevconfig)
817 config["IN_TEST"] = True
819 config["DEV"] = True
820 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
821 config["VERSION"] = "testing_version"
822 config["BASE_URL"] = "http://localhost:3000"
823 config["BACKEND_BASE_URL"] = "http://localhost:8888"
824 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
825 config["COOKIE_DOMAIN"] = "localhost"
827 config["ENABLE_SMS"] = False
828 config["SMS_SENDER_ID"] = "invalid"
830 config["ENABLE_EMAIL"] = False
831 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
832 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
833 config["NOTIFICATION_PREFIX"] = "[TEST] "
834 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
835 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
836 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
838 config["ENABLE_DONATIONS"] = False
839 config["STRIPE_API_KEY"] = ""
840 config["STRIPE_WEBHOOK_SECRET"] = ""
841 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
843 config["ENABLE_STRONG_VERIFICATION"] = False
844 config["IRIS_ID_PUBKEY"] = ""
845 config["IRIS_ID_SECRET"] = ""
846 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
847 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
848 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
849 )
851 config["SMTP_HOST"] = "localhost"
852 config["SMTP_PORT"] = 587
853 config["SMTP_USERNAME"] = "username"
854 config["SMTP_PASSWORD"] = "password"
856 config["ENABLE_MEDIA"] = True
857 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
858 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
859 )
860 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
861 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
862 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
864 config["BUG_TOOL_ENABLED"] = False
865 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
866 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
867 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
869 config["LISTMONK_ENABLED"] = False
870 config["LISTMONK_BASE_URL"] = "https://localhost"
871 config["LISTMONK_API_KEY"] = "..."
872 config["LISTMONK_LIST_UUID"] = "..."
874 config["PUSH_NOTIFICATIONS_ENABLED"] = True
875 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
876 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
878 yield None
880 config.clear()
881 config.update(prevconfig)
884@pytest.fixture
885def fast_passwords():
886 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
887 # make this fast by removing the hashing step
889 def fast_hash(password: bytes) -> bytes:
890 return b"fake hash:" + password
892 def fast_verify(hashed: bytes, password: bytes) -> bool:
893 return hashed == fast_hash(password)
895 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
896 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
897 yield
900def process_jobs():
901 while process_job():
902 pass
905@contextmanager
906def mock_notification_email():
907 with patch("couchers.email._queue_email") as mock:
908 yield mock
909 process_jobs()
912@dataclass
913class EmailData:
914 sender_name: str
915 sender_email: str
916 recipient: str
917 subject: str
918 plain: str
919 html: str
920 source_data: str
921 list_unsubscribe_header: str
924def email_fields(mock, call_ix=0):
925 _, kw = mock.call_args_list[call_ix]
926 return EmailData(
927 sender_name=kw.get("sender_name"),
928 sender_email=kw.get("sender_email"),
929 recipient=kw.get("recipient"),
930 subject=kw.get("subject"),
931 plain=kw.get("plain"),
932 html=kw.get("html"),
933 source_data=kw.get("source_data"),
934 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
935 )
938@pytest.fixture
939def push_collector():
940 """
941 See test_SendTestPushNotification for an example on how to use this fixture
942 """
944 class Push:
945 """
946 This allows nice access to the push info via e.g. push.title instead of push["title"]
947 """
949 def __init__(self, kwargs):
950 self.kwargs = kwargs
952 def __getattr__(self, attr):
953 try:
954 return self.kwargs[attr]
955 except KeyError:
956 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
958 def __repr__(self):
959 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
960 return f"Push({kwargs_disp})"
962 class PushCollector:
963 def __init__(self):
964 # pairs of (user_id, push)
965 self.pushes = []
967 def by_user(self, user_id):
968 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
970 def push_to_user(self, session, user_id, **kwargs):
971 self.pushes.append((user_id, Push(kwargs=kwargs)))
973 def assert_user_has_count(self, user_id, count):
974 assert len(self.by_user(user_id)) == count
976 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
977 push = self.by_user(user_id)[ix]
978 for kwarg in kwargs:
979 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
980 assert (
981 push.kwargs[kwarg] == kwargs[kwarg]
982 ), f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
984 def assert_user_has_single_matching(self, user_id, **kwargs):
985 self.assert_user_has_count(user_id, 1)
986 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
988 collector = PushCollector()
990 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
991 yield collector