Coverage for src/tests/test_fixtures.py: 98%
513 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date, timedelta
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import AuthValidatorInterceptor, SessionInterceptor, _try_get_and_update_user_details
20from couchers.jobs.worker import process_job
21from couchers.models import (
22 Base,
23 FriendRelationship,
24 FriendStatus,
25 HostingStatus,
26 Language,
27 LanguageAbility,
28 LanguageFluency,
29 MeetupStatus,
30 PassportSex,
31 Region,
32 RegionLived,
33 RegionVisited,
34 StrongVerificationAttempt,
35 StrongVerificationAttemptStatus,
36 Upload,
37 User,
38 UserBlock,
39 UserSession,
40)
41from couchers.servicers.account import Account, Iris
42from couchers.servicers.admin import Admin
43from couchers.servicers.api import API
44from couchers.servicers.auth import Auth, create_session
45from couchers.servicers.blocking import Blocking
46from couchers.servicers.bugs import Bugs
47from couchers.servicers.communities import Communities
48from couchers.servicers.conversations import Conversations
49from couchers.servicers.discussions import Discussions
50from couchers.servicers.donations import Donations, Stripe
51from couchers.servicers.events import Events
52from couchers.servicers.gis import GIS
53from couchers.servicers.groups import Groups
54from couchers.servicers.jail import Jail
55from couchers.servicers.media import Media, get_media_auth_interceptor
56from couchers.servicers.notifications import Notifications
57from couchers.servicers.pages import Pages
58from couchers.servicers.references import References
59from couchers.servicers.reporting import Reporting
60from couchers.servicers.requests import Requests
61from couchers.servicers.resources import Resources
62from couchers.servicers.search import Search
63from couchers.servicers.threads import Threads
64from couchers.sql import couchers_select as select
65from couchers.utils import create_coordinate, now
66from proto import (
67 account_pb2_grpc,
68 admin_pb2_grpc,
69 annotations_pb2,
70 api_pb2_grpc,
71 auth_pb2_grpc,
72 blocking_pb2_grpc,
73 bugs_pb2_grpc,
74 communities_pb2_grpc,
75 conversations_pb2_grpc,
76 discussions_pb2_grpc,
77 donations_pb2_grpc,
78 events_pb2_grpc,
79 gis_pb2_grpc,
80 groups_pb2_grpc,
81 iris_pb2_grpc,
82 jail_pb2_grpc,
83 media_pb2_grpc,
84 notifications_pb2_grpc,
85 pages_pb2_grpc,
86 references_pb2_grpc,
87 reporting_pb2_grpc,
88 requests_pb2_grpc,
89 resources_pb2_grpc,
90 search_pb2_grpc,
91 stripe_pb2_grpc,
92 threads_pb2_grpc,
93)
96def drop_all():
97 """drop everything currently in the database"""
98 with session_scope() as session:
99 # postgis is required for all the Geographic Information System (GIS) stuff
100 # pg_trgm is required for trigram based search
101 # btree_gist is required for gist-based exclusion constraints
102 session.execute(
103 text(
104 "DROP SCHEMA IF EXISTS public CASCADE;"
105 "DROP SCHEMA IF EXISTS logging CASCADE;"
106 "DROP EXTENSION IF EXISTS postgis CASCADE;"
107 "CREATE SCHEMA public;"
108 "CREATE SCHEMA logging;"
109 "CREATE EXTENSION postgis;"
110 "CREATE EXTENSION pg_trgm;"
111 "CREATE EXTENSION btree_gist;"
112 )
113 )
115 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
116 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
117 # and similar errors
118 _get_base_engine().dispose()
120 close_all_sessions()
123def create_schema_from_models():
124 """
125 Create everything from the current models, not incrementally
126 through migrations.
127 """
129 # create the slugify function
130 functions = Path(__file__).parent / "slugify.sql"
131 with open(functions) as f, session_scope() as session:
132 session.execute(text(f.read()))
134 Base.metadata.create_all(_get_base_engine())
137def populate_testing_resources(session):
138 """
139 Testing version of couchers.resources.copy_resources_to_database
140 """
141 regions = [
142 ("AUS", "Australia"),
143 ("CAN", "Canada"),
144 ("CHE", "Switzerland"),
145 ("CUB", "Cuba"),
146 ("CXR", "Christmas Island"),
147 ("CZE", "Czechia"),
148 ("DEU", "Germany"),
149 ("EGY", "Egypt"),
150 ("ESP", "Spain"),
151 ("EST", "Estonia"),
152 ("FIN", "Finland"),
153 ("FRA", "France"),
154 ("GBR", "United Kingdom"),
155 ("GEO", "Georgia"),
156 ("GHA", "Ghana"),
157 ("GRC", "Greece"),
158 ("HKG", "Hong Kong"),
159 ("IRL", "Ireland"),
160 ("ISR", "Israel"),
161 ("ITA", "Italy"),
162 ("JPN", "Japan"),
163 ("LAO", "Laos"),
164 ("MEX", "Mexico"),
165 ("MMR", "Myanmar"),
166 ("NAM", "Namibia"),
167 ("NLD", "Netherlands"),
168 ("NZL", "New Zealand"),
169 ("POL", "Poland"),
170 ("PRK", "North Korea"),
171 ("REU", "Réunion"),
172 ("SGP", "Singapore"),
173 ("SWE", "Sweden"),
174 ("THA", "Thailand"),
175 ("TUR", "Turkey"),
176 ("TWN", "Taiwan"),
177 ("USA", "United States"),
178 ("VNM", "Vietnam"),
179 ]
181 languages = [
182 ("arb", "Arabic (Standard)"),
183 ("deu", "German"),
184 ("eng", "English"),
185 ("fin", "Finnish"),
186 ("fra", "French"),
187 ("heb", "Hebrew"),
188 ("hun", "Hungarian"),
189 ("jpn", "Japanese"),
190 ("pol", "Polish"),
191 ("swe", "Swedish"),
192 ("cmn", "Chinese (Mandarin)"),
193 ]
195 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
196 tz_sql = f.read()
198 for code, name in regions:
199 session.add(Region(code=code, name=name))
201 for code, name in languages:
202 session.add(Language(code=code, name=name))
204 session.execute(text(tz_sql))
207def recreate_database():
208 """
209 Connect to a running Postgres database, build it using metadata.create_all()
210 """
212 # running in non-UTC catches some timezone errors
213 os.environ["TZ"] = "America/New_York"
215 # drop everything currently in the database
216 drop_all()
218 # create everything from the current models, not incrementally through migrations
219 create_schema_from_models()
221 with session_scope() as session:
222 populate_testing_resources(session)
225@pytest.fixture()
226def db():
227 """
228 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
229 """
231 recreate_database()
234def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs):
235 """
236 Create a new user, return session token
238 The user is detached from any session, and you can access its static attributes, but you can't modify it
240 Use this most of the time
241 """
242 auth = Auth()
244 with session_scope() as session:
245 # default args
246 username = "test_user_" + random_hex(16)
247 user_opts = {
248 "username": username,
249 "email": f"{username}@dev.couchers.org",
250 # password is just 'password'
251 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
252 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
253 "name": username.capitalize(),
254 "hosting_status": HostingStatus.cant_host,
255 "meetup_status": MeetupStatus.open_to_meetup,
256 "city": "Testing city",
257 "hometown": "Test hometown",
258 "community_standing": 0.5,
259 "birthdate": date(year=2000, month=1, day=1),
260 "gender": "Woman",
261 "pronouns": "",
262 "occupation": "Tester",
263 "education": "UST(esting)",
264 "about_me": "I test things",
265 "things_i_like": "Code",
266 "about_place": "My place has a lot of testing paraphenelia",
267 "additional_information": "I can be a bit testy",
268 # you need to make sure to update this logic to make sure the user is jailed/not on request
269 "accepted_tos": TOS_VERSION,
270 "accepted_community_guidelines": GUIDELINES_VERSION,
271 "geom": create_coordinate(40.7108, -73.9740),
272 "geom_radius": 100,
273 "onboarding_emails_sent": 1,
274 "last_onboarding_email_sent": now(),
275 "has_donated": True,
276 }
278 for key, value in kwargs.items():
279 user_opts[key] = value
281 user = User(**user_opts)
282 session.add(user)
283 session.flush()
285 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
286 session.add(RegionVisited(user_id=user.id, region_code="REU"))
287 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
289 session.add(RegionLived(user_id=user.id, region_code="ESP"))
290 session.add(RegionLived(user_id=user.id, region_code="FRA"))
291 session.add(RegionLived(user_id=user.id, region_code="EST"))
293 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
294 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
296 # this expires the user, so now it's "dirty"
297 session.commit()
299 class _DummyContext:
300 def invocation_metadata(self):
301 return {}
303 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
305 # deleted user aborts session creation, hence this follows and necessitates a second commit
306 if delete_user:
307 user.is_deleted = True
309 user.recommendation_score = 1e10 - user.id
311 if complete_profile:
312 key = random_hex(32)
313 filename = random_hex(32) + ".jpg"
314 session.add(
315 Upload(
316 key=key,
317 filename=filename,
318 creator_user_id=user.id,
319 )
320 )
321 session.flush()
322 user.avatar_key = key
323 user.about_me = "I have a complete profile!\n" * 20
325 if strong_verification:
326 attempt = StrongVerificationAttempt(
327 verification_attempt_token=f"verification_attempt_token_{user.id}",
328 user_id=user.id,
329 status=StrongVerificationAttemptStatus.succeeded,
330 has_full_data=True,
331 passport_encrypted_data=b"not real",
332 passport_date_of_birth=user.birthdate,
333 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
334 user.gender, PassportSex.unspecified
335 ),
336 has_minimal_data=True,
337 passport_expiry_date=date.today() + timedelta(days=10),
338 passport_nationality="UTO",
339 passport_last_three_document_chars=f"{user.id:03}",
340 iris_token=f"iris_token_{user.id}",
341 iris_session_id=user.id,
342 )
343 session.add(attempt)
344 session.flush()
345 assert attempt.has_strong_verification(user)
347 session.commit()
349 assert user.has_completed_profile == complete_profile
351 # refresh it, undoes the expiry
352 session.refresh(user)
354 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
355 user.timezone # noqa: B018
357 # allows detaches the user from the session, allowing its use outside this session
358 session.expunge(user)
360 return user, token
363def get_user_id_and_token(session, username):
364 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
365 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
366 return user_id, token
369def make_friends(user1, user2):
370 with session_scope() as session:
371 friend_relationship = FriendRelationship(
372 from_user_id=user1.id,
373 to_user_id=user2.id,
374 status=FriendStatus.accepted,
375 )
376 session.add(friend_relationship)
379def make_user_block(user1, user2):
380 with session_scope() as session:
381 user_block = UserBlock(
382 blocking_user_id=user1.id,
383 blocked_user_id=user2.id,
384 )
385 session.add(user_block)
386 session.commit()
389def make_user_invisible(user_id):
390 with session_scope() as session:
391 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
394# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
395def get_friend_relationship(user1, user2):
396 with session_scope() as session:
397 friend_relationship = session.execute(
398 select(FriendRelationship).where(
399 or_(
400 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
401 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
402 )
403 )
404 ).scalar_one_or_none()
406 session.expunge(friend_relationship)
407 return friend_relationship
410class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
411 """
412 Injects the right `cookie: couchers-sesh=...` header into the metadata
413 """
415 def __init__(self, token):
416 self.token = token
418 def __call__(self, context, callback):
419 callback((("cookie", f"couchers-sesh={self.token}"),), None)
422@contextmanager
423def auth_api_session(grpc_channel_options=()):
424 """
425 Create an Auth API for testing
427 This needs to use the real server since it plays around with headers
428 """
429 with futures.ThreadPoolExecutor(1) as executor:
430 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
431 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
432 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
433 server.start()
435 try:
436 with grpc.secure_channel(
437 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
438 ) as channel:
440 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
441 def __init__(self):
442 self.latest_headers = {}
444 def intercept_unary_unary(self, continuation, client_call_details, request):
445 call = continuation(client_call_details, request)
446 self.latest_headers = dict(call.initial_metadata())
447 self.latest_header_raw = call.initial_metadata()
448 return call
450 metadata_interceptor = _MetadataKeeperInterceptor()
451 channel = grpc.intercept_channel(channel, metadata_interceptor)
452 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
453 finally:
454 server.stop(None).wait()
457@contextmanager
458def api_session(token):
459 """
460 Create an API for testing, uses the token for auth
461 """
462 channel = fake_channel(token)
463 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
464 yield api_pb2_grpc.APIStub(channel)
467@contextmanager
468def real_api_session(token):
469 """
470 Create an API for testing, using TCP sockets, uses the token for auth
471 """
472 with futures.ThreadPoolExecutor(1) as executor:
473 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
474 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
475 api_pb2_grpc.add_APIServicer_to_server(API(), server)
476 server.start()
478 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
479 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
481 try:
482 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
483 yield api_pb2_grpc.APIStub(channel)
484 finally:
485 server.stop(None).wait()
488@contextmanager
489def real_admin_session(token):
490 """
491 Create a Admin service for testing, using TCP sockets, uses the token for auth
492 """
493 with futures.ThreadPoolExecutor(1) as executor:
494 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
495 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
496 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
497 server.start()
499 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
500 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
502 try:
503 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
504 yield admin_pb2_grpc.AdminStub(channel)
505 finally:
506 server.stop(None).wait()
509@contextmanager
510def real_account_session(token):
511 """
512 Create a Account service for testing, using TCP sockets, uses the token for auth
513 """
514 with futures.ThreadPoolExecutor(1) as executor:
515 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
516 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
517 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
518 server.start()
520 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
521 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
523 try:
524 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
525 yield account_pb2_grpc.AccountStub(channel)
526 finally:
527 server.stop(None).wait()
530@contextmanager
531def real_jail_session(token):
532 """
533 Create a Jail service for testing, using TCP sockets, uses the token for auth
534 """
535 with futures.ThreadPoolExecutor(1) as executor:
536 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
537 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
538 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
539 server.start()
541 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
542 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
544 try:
545 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
546 yield jail_pb2_grpc.JailStub(channel)
547 finally:
548 server.stop(None).wait()
551@contextmanager
552def gis_session(token):
553 channel = fake_channel(token)
554 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
555 yield gis_pb2_grpc.GISStub(channel)
558class FakeRpcError(grpc.RpcError):
559 def __init__(self, code, details):
560 self._code = code
561 self._details = details
563 def code(self):
564 return self._code
566 def details(self):
567 return self._details
570def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
571 # method is of the form "/org.couchers.api.core.API/GetUser"
572 _, service_name, method_name = method.split("/")
574 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
575 auth_level = service_options.Extensions[annotations_pb2.auth_level]
576 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
577 assert auth_level in [
578 annotations_pb2.AUTH_LEVEL_OPEN,
579 annotations_pb2.AUTH_LEVEL_JAILED,
580 annotations_pb2.AUTH_LEVEL_SECURE,
581 annotations_pb2.AUTH_LEVEL_ADMIN,
582 ]
584 if not user_id:
585 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
586 else:
587 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
588 "Non-superuser tried to call superuser API"
589 )
590 assert not (
591 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
592 ), "User is jailed but tried to call non-open/non-jailed API"
595class FakeChannel:
596 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
597 self.handlers = {}
598 self.user_id = user_id
599 self._is_jailed = is_jailed
600 self._is_superuser = is_superuser
601 self._token_expiry = token_expiry
603 def abort(self, code, details):
604 raise FakeRpcError(code, details)
606 def add_generic_rpc_handlers(self, generic_rpc_handlers):
607 from grpc._server import _validate_generic_rpc_handlers
609 _validate_generic_rpc_handlers(generic_rpc_handlers)
611 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
613 def unary_unary(self, uri, request_serializer, response_deserializer):
614 handler = self.handlers[uri]
616 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
618 def fake_handler(request):
619 # Do a full serialization cycle on the request and the
620 # response to catch accidental use of unserializable data.
621 request = handler.request_deserializer(request_serializer(request))
623 with session_scope() as session:
624 response = handler.unary_unary(request, self, session)
626 return response_deserializer(handler.response_serializer(response))
628 return fake_handler
631def fake_channel(token=None):
632 if token:
633 user_id, is_jailed, is_superuser, token_expiry = _try_get_and_update_user_details(
634 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
635 )
636 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
637 return FakeChannel()
640@contextmanager
641def conversations_session(token):
642 """
643 Create a Conversations API for testing, uses the token for auth
644 """
645 channel = fake_channel(token)
646 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
647 yield conversations_pb2_grpc.ConversationsStub(channel)
650@contextmanager
651def requests_session(token):
652 """
653 Create a Requests API for testing, uses the token for auth
654 """
655 channel = fake_channel(token)
656 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
657 yield requests_pb2_grpc.RequestsStub(channel)
660@contextmanager
661def threads_session(token):
662 channel = fake_channel(token)
663 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
664 yield threads_pb2_grpc.ThreadsStub(channel)
667@contextmanager
668def discussions_session(token):
669 channel = fake_channel(token)
670 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
671 yield discussions_pb2_grpc.DiscussionsStub(channel)
674@contextmanager
675def donations_session(token):
676 channel = fake_channel(token)
677 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
678 yield donations_pb2_grpc.DonationsStub(channel)
681@contextmanager
682def real_stripe_session():
683 """
684 Create a Stripe service for testing, using TCP sockets
685 """
686 with futures.ThreadPoolExecutor(1) as executor:
687 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
688 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
689 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
690 server.start()
692 creds = grpc.local_channel_credentials()
694 try:
695 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
696 yield stripe_pb2_grpc.StripeStub(channel)
697 finally:
698 server.stop(None).wait()
701@contextmanager
702def real_iris_session():
703 with futures.ThreadPoolExecutor(1) as executor:
704 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
705 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
706 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
707 server.start()
709 creds = grpc.local_channel_credentials()
711 try:
712 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
713 yield iris_pb2_grpc.IrisStub(channel)
714 finally:
715 server.stop(None).wait()
718@contextmanager
719def pages_session(token):
720 channel = fake_channel(token)
721 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
722 yield pages_pb2_grpc.PagesStub(channel)
725@contextmanager
726def communities_session(token):
727 channel = fake_channel(token)
728 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
729 yield communities_pb2_grpc.CommunitiesStub(channel)
732@contextmanager
733def groups_session(token):
734 channel = fake_channel(token)
735 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
736 yield groups_pb2_grpc.GroupsStub(channel)
739@contextmanager
740def blocking_session(token):
741 channel = fake_channel(token)
742 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
743 yield blocking_pb2_grpc.BlockingStub(channel)
746@contextmanager
747def notifications_session(token):
748 channel = fake_channel(token)
749 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
750 yield notifications_pb2_grpc.NotificationsStub(channel)
753@contextmanager
754def account_session(token):
755 """
756 Create a Account API for testing, uses the token for auth
757 """
758 channel = fake_channel(token)
759 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
760 yield account_pb2_grpc.AccountStub(channel)
763@contextmanager
764def search_session(token):
765 """
766 Create a Search API for testing, uses the token for auth
767 """
768 channel = fake_channel(token)
769 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
770 yield search_pb2_grpc.SearchStub(channel)
773@contextmanager
774def references_session(token):
775 """
776 Create a References API for testing, uses the token for auth
777 """
778 channel = fake_channel(token)
779 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
780 yield references_pb2_grpc.ReferencesStub(channel)
783@contextmanager
784def reporting_session(token):
785 channel = fake_channel(token)
786 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
787 yield reporting_pb2_grpc.ReportingStub(channel)
790@contextmanager
791def events_session(token):
792 channel = fake_channel(token)
793 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
794 yield events_pb2_grpc.EventsStub(channel)
797@contextmanager
798def bugs_session(token=None):
799 channel = fake_channel(token)
800 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
801 yield bugs_pb2_grpc.BugsStub(channel)
804@contextmanager
805def resources_session():
806 channel = fake_channel()
807 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
808 yield resources_pb2_grpc.ResourcesStub(channel)
811@contextmanager
812def media_session(bearer_token):
813 """
814 Create a fresh Media API for testing, uses the bearer token for media auth
815 """
816 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
818 with futures.ThreadPoolExecutor(1) as executor:
819 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
820 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
821 servicer = Media()
822 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
823 server.start()
825 call_creds = grpc.access_token_call_credentials(bearer_token)
826 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
828 try:
829 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
830 yield media_pb2_grpc.MediaStub(channel)
831 finally:
832 server.stop(None).wait()
835@pytest.fixture(scope="class")
836def testconfig():
837 prevconfig = config.copy()
838 config.clear()
839 config.update(prevconfig)
841 config["IN_TEST"] = True
843 config["DEV"] = True
844 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
845 config["VERSION"] = "testing_version"
846 config["BASE_URL"] = "http://localhost:3000"
847 config["BACKEND_BASE_URL"] = "http://localhost:8888"
848 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
849 config["COOKIE_DOMAIN"] = "localhost"
851 config["ENABLE_SMS"] = False
852 config["SMS_SENDER_ID"] = "invalid"
854 config["ENABLE_EMAIL"] = False
855 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
856 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
857 config["NOTIFICATION_PREFIX"] = "[TEST] "
858 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
859 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
860 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
862 config["ENABLE_DONATIONS"] = False
863 config["STRIPE_API_KEY"] = ""
864 config["STRIPE_WEBHOOK_SECRET"] = ""
865 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
867 config["ENABLE_STRONG_VERIFICATION"] = False
868 config["IRIS_ID_PUBKEY"] = ""
869 config["IRIS_ID_SECRET"] = ""
870 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
871 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
872 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
873 )
875 config["SMTP_HOST"] = "localhost"
876 config["SMTP_PORT"] = 587
877 config["SMTP_USERNAME"] = "username"
878 config["SMTP_PASSWORD"] = "password"
880 config["ENABLE_MEDIA"] = True
881 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
882 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
883 )
884 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
885 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
886 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
888 config["BUG_TOOL_ENABLED"] = False
889 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
890 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
891 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
893 config["LISTMONK_ENABLED"] = False
894 config["LISTMONK_BASE_URL"] = "https://localhost"
895 config["LISTMONK_API_USERNAME"] = "..."
896 config["LISTMONK_API_KEY"] = "..."
897 config["LISTMONK_LIST_ID"] = 3
899 config["PUSH_NOTIFICATIONS_ENABLED"] = True
900 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
901 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
903 yield None
905 config.clear()
906 config.update(prevconfig)
909@pytest.fixture
910def fast_passwords():
911 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
912 # make this fast by removing the hashing step
914 def fast_hash(password: bytes) -> bytes:
915 return b"fake hash:" + password
917 def fast_verify(hashed: bytes, password: bytes) -> bool:
918 return hashed == fast_hash(password)
920 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
921 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
922 yield
925def process_jobs():
926 while process_job():
927 pass
930@contextmanager
931def mock_notification_email():
932 with patch("couchers.email._queue_email") as mock:
933 yield mock
934 process_jobs()
937@dataclass
938class EmailData:
939 sender_name: str
940 sender_email: str
941 recipient: str
942 subject: str
943 plain: str
944 html: str
945 source_data: str
946 list_unsubscribe_header: str
949def email_fields(mock, call_ix=0):
950 _, kw = mock.call_args_list[call_ix]
951 return EmailData(
952 sender_name=kw.get("sender_name"),
953 sender_email=kw.get("sender_email"),
954 recipient=kw.get("recipient"),
955 subject=kw.get("subject"),
956 plain=kw.get("plain"),
957 html=kw.get("html"),
958 source_data=kw.get("source_data"),
959 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
960 )
963@pytest.fixture
964def push_collector():
965 """
966 See test_SendTestPushNotification for an example on how to use this fixture
967 """
969 class Push:
970 """
971 This allows nice access to the push info via e.g. push.title instead of push["title"]
972 """
974 def __init__(self, kwargs):
975 self.kwargs = kwargs
977 def __getattr__(self, attr):
978 try:
979 return self.kwargs[attr]
980 except KeyError:
981 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
983 def __repr__(self):
984 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
985 return f"Push({kwargs_disp})"
987 class PushCollector:
988 def __init__(self):
989 # pairs of (user_id, push)
990 self.pushes = []
992 def by_user(self, user_id):
993 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
995 def push_to_user(self, session, user_id, **kwargs):
996 self.pushes.append((user_id, Push(kwargs=kwargs)))
998 def assert_user_has_count(self, user_id, count):
999 assert len(self.by_user(user_id)) == count
1001 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1002 push = self.by_user(user_id)[ix]
1003 for kwarg in kwargs:
1004 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1005 assert push.kwargs[kwarg] == kwargs[kwarg], (
1006 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1007 )
1009 def assert_user_has_single_matching(self, user_id, **kwargs):
1010 self.assert_user_has_count(user_id, 1)
1011 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1013 collector = PushCollector()
1015 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1016 yield collector