Coverage for src/tests/test_fixtures.py: 98%
522 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date, timedelta
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import (
20 AuthValidatorInterceptor,
21 CookieInterceptor,
22 SessionInterceptor,
23 _try_get_and_update_user_details,
24)
25from couchers.jobs.worker import process_job
26from couchers.models import (
27 Base,
28 FriendRelationship,
29 FriendStatus,
30 HostingStatus,
31 Language,
32 LanguageAbility,
33 LanguageFluency,
34 MeetupStatus,
35 PassportSex,
36 Region,
37 RegionLived,
38 RegionVisited,
39 StrongVerificationAttempt,
40 StrongVerificationAttemptStatus,
41 Upload,
42 User,
43 UserBlock,
44 UserSession,
45)
46from couchers.servicers.account import Account, Iris
47from couchers.servicers.admin import Admin
48from couchers.servicers.api import API
49from couchers.servicers.auth import Auth, create_session
50from couchers.servicers.blocking import Blocking
51from couchers.servicers.bugs import Bugs
52from couchers.servicers.communities import Communities
53from couchers.servicers.conversations import Conversations
54from couchers.servicers.discussions import Discussions
55from couchers.servicers.donations import Donations, Stripe
56from couchers.servicers.events import Events
57from couchers.servicers.gis import GIS
58from couchers.servicers.groups import Groups
59from couchers.servicers.jail import Jail
60from couchers.servicers.media import Media, get_media_auth_interceptor
61from couchers.servicers.notifications import Notifications
62from couchers.servicers.pages import Pages
63from couchers.servicers.public import Public
64from couchers.servicers.references import References
65from couchers.servicers.reporting import Reporting
66from couchers.servicers.requests import Requests
67from couchers.servicers.resources import Resources
68from couchers.servicers.search import Search
69from couchers.servicers.threads import Threads
70from couchers.sql import couchers_select as select
71from couchers.utils import create_coordinate, now
72from proto import (
73 account_pb2_grpc,
74 admin_pb2_grpc,
75 annotations_pb2,
76 api_pb2_grpc,
77 auth_pb2_grpc,
78 blocking_pb2_grpc,
79 bugs_pb2_grpc,
80 communities_pb2_grpc,
81 conversations_pb2_grpc,
82 discussions_pb2_grpc,
83 donations_pb2_grpc,
84 events_pb2_grpc,
85 gis_pb2_grpc,
86 groups_pb2_grpc,
87 iris_pb2_grpc,
88 jail_pb2_grpc,
89 media_pb2_grpc,
90 notifications_pb2_grpc,
91 pages_pb2_grpc,
92 public_pb2_grpc,
93 references_pb2_grpc,
94 reporting_pb2_grpc,
95 requests_pb2_grpc,
96 resources_pb2_grpc,
97 search_pb2_grpc,
98 stripe_pb2_grpc,
99 threads_pb2_grpc,
100)
103def drop_all():
104 """drop everything currently in the database"""
105 with session_scope() as session:
106 # postgis is required for all the Geographic Information System (GIS) stuff
107 # pg_trgm is required for trigram based search
108 # btree_gist is required for gist-based exclusion constraints
109 session.execute(
110 text(
111 "DROP SCHEMA IF EXISTS public CASCADE;"
112 "DROP SCHEMA IF EXISTS logging CASCADE;"
113 "DROP EXTENSION IF EXISTS postgis CASCADE;"
114 "CREATE SCHEMA public;"
115 "CREATE SCHEMA logging;"
116 "CREATE EXTENSION postgis;"
117 "CREATE EXTENSION pg_trgm;"
118 "CREATE EXTENSION btree_gist;"
119 )
120 )
122 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
123 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
124 # and similar errors
125 _get_base_engine().dispose()
127 close_all_sessions()
130def create_schema_from_models():
131 """
132 Create everything from the current models, not incrementally
133 through migrations.
134 """
136 # create the slugify function
137 functions = Path(__file__).parent / "slugify.sql"
138 with open(functions) as f, session_scope() as session:
139 session.execute(text(f.read()))
141 Base.metadata.create_all(_get_base_engine())
144def populate_testing_resources(session):
145 """
146 Testing version of couchers.resources.copy_resources_to_database
147 """
148 regions = [
149 ("AUS", "Australia"),
150 ("CAN", "Canada"),
151 ("CHE", "Switzerland"),
152 ("CUB", "Cuba"),
153 ("CXR", "Christmas Island"),
154 ("CZE", "Czechia"),
155 ("DEU", "Germany"),
156 ("EGY", "Egypt"),
157 ("ESP", "Spain"),
158 ("EST", "Estonia"),
159 ("FIN", "Finland"),
160 ("FRA", "France"),
161 ("GBR", "United Kingdom"),
162 ("GEO", "Georgia"),
163 ("GHA", "Ghana"),
164 ("GRC", "Greece"),
165 ("HKG", "Hong Kong"),
166 ("IRL", "Ireland"),
167 ("ISR", "Israel"),
168 ("ITA", "Italy"),
169 ("JPN", "Japan"),
170 ("LAO", "Laos"),
171 ("MEX", "Mexico"),
172 ("MMR", "Myanmar"),
173 ("NAM", "Namibia"),
174 ("NLD", "Netherlands"),
175 ("NZL", "New Zealand"),
176 ("POL", "Poland"),
177 ("PRK", "North Korea"),
178 ("REU", "Réunion"),
179 ("SGP", "Singapore"),
180 ("SWE", "Sweden"),
181 ("THA", "Thailand"),
182 ("TUR", "Turkey"),
183 ("TWN", "Taiwan"),
184 ("USA", "United States"),
185 ("VNM", "Vietnam"),
186 ]
188 languages = [
189 ("arb", "Arabic (Standard)"),
190 ("deu", "German"),
191 ("eng", "English"),
192 ("fin", "Finnish"),
193 ("fra", "French"),
194 ("heb", "Hebrew"),
195 ("hun", "Hungarian"),
196 ("jpn", "Japanese"),
197 ("pol", "Polish"),
198 ("swe", "Swedish"),
199 ("cmn", "Chinese (Mandarin)"),
200 ]
202 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
203 tz_sql = f.read()
205 for code, name in regions:
206 session.add(Region(code=code, name=name))
208 for code, name in languages:
209 session.add(Language(code=code, name=name))
211 session.execute(text(tz_sql))
214def recreate_database():
215 """
216 Connect to a running Postgres database, build it using metadata.create_all()
217 """
219 # running in non-UTC catches some timezone errors
220 os.environ["TZ"] = "America/New_York"
222 # drop everything currently in the database
223 drop_all()
225 # create everything from the current models, not incrementally through migrations
226 create_schema_from_models()
228 with session_scope() as session:
229 populate_testing_resources(session)
232@pytest.fixture()
233def db():
234 """
235 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
236 """
238 recreate_database()
241def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs):
242 """
243 Create a new user, return session token
245 The user is detached from any session, and you can access its static attributes, but you can't modify it
247 Use this most of the time
248 """
249 auth = Auth()
251 with session_scope() as session:
252 # default args
253 username = "test_user_" + random_hex(16)
254 user_opts = {
255 "username": username,
256 "email": f"{username}@dev.couchers.org",
257 # password is just 'password'
258 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
259 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
260 "name": username.capitalize(),
261 "hosting_status": HostingStatus.cant_host,
262 "meetup_status": MeetupStatus.open_to_meetup,
263 "city": "Testing city",
264 "hometown": "Test hometown",
265 "community_standing": 0.5,
266 "birthdate": date(year=2000, month=1, day=1),
267 "gender": "Woman",
268 "pronouns": "",
269 "occupation": "Tester",
270 "education": "UST(esting)",
271 "about_me": "I test things",
272 "things_i_like": "Code",
273 "about_place": "My place has a lot of testing paraphenelia",
274 "additional_information": "I can be a bit testy",
275 # you need to make sure to update this logic to make sure the user is jailed/not on request
276 "accepted_tos": TOS_VERSION,
277 "accepted_community_guidelines": GUIDELINES_VERSION,
278 "geom": create_coordinate(40.7108, -73.9740),
279 "geom_radius": 100,
280 "onboarding_emails_sent": 1,
281 "last_onboarding_email_sent": now(),
282 "has_donated": True,
283 }
285 for key, value in kwargs.items():
286 user_opts[key] = value
288 user = User(**user_opts)
289 session.add(user)
290 session.flush()
292 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
293 session.add(RegionVisited(user_id=user.id, region_code="REU"))
294 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
296 session.add(RegionLived(user_id=user.id, region_code="ESP"))
297 session.add(RegionLived(user_id=user.id, region_code="FRA"))
298 session.add(RegionLived(user_id=user.id, region_code="EST"))
300 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
301 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
303 # this expires the user, so now it's "dirty"
304 session.commit()
306 class _DummyContext:
307 def invocation_metadata(self):
308 return {}
310 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
312 # deleted user aborts session creation, hence this follows and necessitates a second commit
313 if delete_user:
314 user.is_deleted = True
316 user.recommendation_score = 1e10 - user.id
318 if complete_profile:
319 key = random_hex(32)
320 filename = random_hex(32) + ".jpg"
321 session.add(
322 Upload(
323 key=key,
324 filename=filename,
325 creator_user_id=user.id,
326 )
327 )
328 session.flush()
329 user.avatar_key = key
330 user.about_me = "I have a complete profile!\n" * 20
332 if strong_verification:
333 attempt = StrongVerificationAttempt(
334 verification_attempt_token=f"verification_attempt_token_{user.id}",
335 user_id=user.id,
336 status=StrongVerificationAttemptStatus.succeeded,
337 has_full_data=True,
338 passport_encrypted_data=b"not real",
339 passport_date_of_birth=user.birthdate,
340 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
341 user.gender, PassportSex.unspecified
342 ),
343 has_minimal_data=True,
344 passport_expiry_date=date.today() + timedelta(days=10),
345 passport_nationality="UTO",
346 passport_last_three_document_chars=f"{user.id:03}",
347 iris_token=f"iris_token_{user.id}",
348 iris_session_id=user.id,
349 )
350 session.add(attempt)
351 session.flush()
352 assert attempt.has_strong_verification(user)
354 session.commit()
356 assert user.has_completed_profile == complete_profile
358 # refresh it, undoes the expiry
359 session.refresh(user)
361 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
362 user.timezone # noqa: B018
364 # allows detaches the user from the session, allowing its use outside this session
365 session.expunge(user)
367 return user, token
370def get_user_id_and_token(session, username):
371 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
372 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
373 return user_id, token
376def make_friends(user1, user2):
377 with session_scope() as session:
378 friend_relationship = FriendRelationship(
379 from_user_id=user1.id,
380 to_user_id=user2.id,
381 status=FriendStatus.accepted,
382 )
383 session.add(friend_relationship)
386def make_user_block(user1, user2):
387 with session_scope() as session:
388 user_block = UserBlock(
389 blocking_user_id=user1.id,
390 blocked_user_id=user2.id,
391 )
392 session.add(user_block)
393 session.commit()
396def make_user_invisible(user_id):
397 with session_scope() as session:
398 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
401# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
402def get_friend_relationship(user1, user2):
403 with session_scope() as session:
404 friend_relationship = session.execute(
405 select(FriendRelationship).where(
406 or_(
407 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
408 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
409 )
410 )
411 ).scalar_one_or_none()
413 session.expunge(friend_relationship)
414 return friend_relationship
417class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
418 """
419 Injects the right `cookie: couchers-sesh=...` header into the metadata
420 """
422 def __init__(self, token):
423 self.token = token
425 def __call__(self, context, callback):
426 callback((("cookie", f"couchers-sesh={self.token}"),), None)
429@contextmanager
430def auth_api_session(grpc_channel_options=()):
431 """
432 Create an Auth API for testing
434 This needs to use the real server since it plays around with headers
435 """
436 with futures.ThreadPoolExecutor(1) as executor:
437 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
438 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
439 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
440 server.start()
442 try:
443 with grpc.secure_channel(
444 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
445 ) as channel:
447 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
448 def __init__(self):
449 self.latest_headers = {}
451 def intercept_unary_unary(self, continuation, client_call_details, request):
452 call = continuation(client_call_details, request)
453 self.latest_headers = dict(call.initial_metadata())
454 self.latest_header_raw = call.initial_metadata()
455 return call
457 metadata_interceptor = _MetadataKeeperInterceptor()
458 channel = grpc.intercept_channel(channel, metadata_interceptor)
459 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
460 finally:
461 server.stop(None).wait()
464@contextmanager
465def api_session(token):
466 """
467 Create an API for testing, uses the token for auth
468 """
469 channel = fake_channel(token)
470 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
471 yield api_pb2_grpc.APIStub(channel)
474@contextmanager
475def real_api_session(token):
476 """
477 Create an API for testing, using TCP sockets, uses the token for auth
478 """
479 with futures.ThreadPoolExecutor(1) as executor:
480 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
481 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
482 api_pb2_grpc.add_APIServicer_to_server(API(), server)
483 server.start()
485 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
486 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
488 try:
489 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
490 yield api_pb2_grpc.APIStub(channel)
491 finally:
492 server.stop(None).wait()
495@contextmanager
496def real_admin_session(token):
497 """
498 Create a Admin service for testing, using TCP sockets, uses the token for auth
499 """
500 with futures.ThreadPoolExecutor(1) as executor:
501 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
502 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
503 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
504 server.start()
506 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
507 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
509 try:
510 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
511 yield admin_pb2_grpc.AdminStub(channel)
512 finally:
513 server.stop(None).wait()
516@contextmanager
517def real_account_session(token):
518 """
519 Create a Account service for testing, using TCP sockets, uses the token for auth
520 """
521 with futures.ThreadPoolExecutor(1) as executor:
522 server = grpc.server(
523 executor, interceptors=[AuthValidatorInterceptor(), CookieInterceptor(), SessionInterceptor()]
524 )
525 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
526 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
527 server.start()
529 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
530 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
532 try:
533 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
534 yield account_pb2_grpc.AccountStub(channel)
535 finally:
536 server.stop(None).wait()
539@contextmanager
540def real_jail_session(token):
541 """
542 Create a Jail service for testing, using TCP sockets, uses the token for auth
543 """
544 with futures.ThreadPoolExecutor(1) as executor:
545 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
546 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
547 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
548 server.start()
550 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
551 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
553 try:
554 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
555 yield jail_pb2_grpc.JailStub(channel)
556 finally:
557 server.stop(None).wait()
560@contextmanager
561def gis_session(token):
562 channel = fake_channel(token)
563 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
564 yield gis_pb2_grpc.GISStub(channel)
567@contextmanager
568def public_session():
569 channel = fake_channel()
570 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
571 yield public_pb2_grpc.PublicStub(channel)
574class FakeRpcError(grpc.RpcError):
575 def __init__(self, code, details):
576 self._code = code
577 self._details = details
579 def code(self):
580 return self._code
582 def details(self):
583 return self._details
586def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
587 # method is of the form "/org.couchers.api.core.API/GetUser"
588 _, service_name, method_name = method.split("/")
590 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
591 auth_level = service_options.Extensions[annotations_pb2.auth_level]
592 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
593 assert auth_level in [
594 annotations_pb2.AUTH_LEVEL_OPEN,
595 annotations_pb2.AUTH_LEVEL_JAILED,
596 annotations_pb2.AUTH_LEVEL_SECURE,
597 annotations_pb2.AUTH_LEVEL_ADMIN,
598 ]
600 if not user_id:
601 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
602 else:
603 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
604 "Non-superuser tried to call superuser API"
605 )
606 assert not (
607 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
608 ), "User is jailed but tried to call non-open/non-jailed API"
611class FakeChannel:
612 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
613 self.handlers = {}
614 self.user_id = user_id
615 self._is_jailed = is_jailed
616 self._is_superuser = is_superuser
617 self._token_expiry = token_expiry
619 def abort(self, code, details):
620 raise FakeRpcError(code, details)
622 def add_generic_rpc_handlers(self, generic_rpc_handlers):
623 from grpc._server import _validate_generic_rpc_handlers
625 _validate_generic_rpc_handlers(generic_rpc_handlers)
627 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
629 def unary_unary(self, uri, request_serializer, response_deserializer):
630 handler = self.handlers[uri]
632 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
634 def fake_handler(request):
635 # Do a full serialization cycle on the request and the
636 # response to catch accidental use of unserializable data.
637 request = handler.request_deserializer(request_serializer(request))
639 with session_scope() as session:
640 response = handler.unary_unary(request, self, session)
642 return response_deserializer(handler.response_serializer(response))
644 return fake_handler
647def fake_channel(token=None):
648 if token:
649 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details(
650 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
651 )
652 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
653 return FakeChannel()
656@contextmanager
657def conversations_session(token):
658 """
659 Create a Conversations API for testing, uses the token for auth
660 """
661 channel = fake_channel(token)
662 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
663 yield conversations_pb2_grpc.ConversationsStub(channel)
666@contextmanager
667def requests_session(token):
668 """
669 Create a Requests API for testing, uses the token for auth
670 """
671 channel = fake_channel(token)
672 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
673 yield requests_pb2_grpc.RequestsStub(channel)
676@contextmanager
677def threads_session(token):
678 channel = fake_channel(token)
679 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
680 yield threads_pb2_grpc.ThreadsStub(channel)
683@contextmanager
684def discussions_session(token):
685 channel = fake_channel(token)
686 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
687 yield discussions_pb2_grpc.DiscussionsStub(channel)
690@contextmanager
691def donations_session(token):
692 channel = fake_channel(token)
693 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
694 yield donations_pb2_grpc.DonationsStub(channel)
697@contextmanager
698def real_stripe_session():
699 """
700 Create a Stripe service for testing, using TCP sockets
701 """
702 with futures.ThreadPoolExecutor(1) as executor:
703 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
704 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
705 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
706 server.start()
708 creds = grpc.local_channel_credentials()
710 try:
711 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
712 yield stripe_pb2_grpc.StripeStub(channel)
713 finally:
714 server.stop(None).wait()
717@contextmanager
718def real_iris_session():
719 with futures.ThreadPoolExecutor(1) as executor:
720 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
721 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
722 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
723 server.start()
725 creds = grpc.local_channel_credentials()
727 try:
728 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
729 yield iris_pb2_grpc.IrisStub(channel)
730 finally:
731 server.stop(None).wait()
734@contextmanager
735def pages_session(token):
736 channel = fake_channel(token)
737 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
738 yield pages_pb2_grpc.PagesStub(channel)
741@contextmanager
742def communities_session(token):
743 channel = fake_channel(token)
744 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
745 yield communities_pb2_grpc.CommunitiesStub(channel)
748@contextmanager
749def groups_session(token):
750 channel = fake_channel(token)
751 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
752 yield groups_pb2_grpc.GroupsStub(channel)
755@contextmanager
756def blocking_session(token):
757 channel = fake_channel(token)
758 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
759 yield blocking_pb2_grpc.BlockingStub(channel)
762@contextmanager
763def notifications_session(token):
764 channel = fake_channel(token)
765 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
766 yield notifications_pb2_grpc.NotificationsStub(channel)
769@contextmanager
770def account_session(token):
771 """
772 Create a Account API for testing, uses the token for auth
773 """
774 channel = fake_channel(token)
775 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
776 yield account_pb2_grpc.AccountStub(channel)
779@contextmanager
780def search_session(token):
781 """
782 Create a Search API for testing, uses the token for auth
783 """
784 channel = fake_channel(token)
785 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
786 yield search_pb2_grpc.SearchStub(channel)
789@contextmanager
790def references_session(token):
791 """
792 Create a References API for testing, uses the token for auth
793 """
794 channel = fake_channel(token)
795 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
796 yield references_pb2_grpc.ReferencesStub(channel)
799@contextmanager
800def reporting_session(token):
801 channel = fake_channel(token)
802 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
803 yield reporting_pb2_grpc.ReportingStub(channel)
806@contextmanager
807def events_session(token):
808 channel = fake_channel(token)
809 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
810 yield events_pb2_grpc.EventsStub(channel)
813@contextmanager
814def bugs_session(token=None):
815 channel = fake_channel(token)
816 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
817 yield bugs_pb2_grpc.BugsStub(channel)
820@contextmanager
821def resources_session():
822 channel = fake_channel()
823 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
824 yield resources_pb2_grpc.ResourcesStub(channel)
827@contextmanager
828def media_session(bearer_token):
829 """
830 Create a fresh Media API for testing, uses the bearer token for media auth
831 """
832 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
834 with futures.ThreadPoolExecutor(1) as executor:
835 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
836 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
837 servicer = Media()
838 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
839 server.start()
841 call_creds = grpc.access_token_call_credentials(bearer_token)
842 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
844 try:
845 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
846 yield media_pb2_grpc.MediaStub(channel)
847 finally:
848 server.stop(None).wait()
851@pytest.fixture(scope="class")
852def testconfig():
853 prevconfig = config.copy()
854 config.clear()
855 config.update(prevconfig)
857 config["IN_TEST"] = True
859 config["DEV"] = True
860 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
861 config["VERSION"] = "testing_version"
862 config["BASE_URL"] = "http://localhost:3000"
863 config["BACKEND_BASE_URL"] = "http://localhost:8888"
864 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
865 config["COOKIE_DOMAIN"] = "localhost"
867 config["ENABLE_SMS"] = False
868 config["SMS_SENDER_ID"] = "invalid"
870 config["ENABLE_EMAIL"] = False
871 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
872 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
873 config["NOTIFICATION_PREFIX"] = "[TEST] "
874 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
875 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
876 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
878 config["ENABLE_DONATIONS"] = False
879 config["STRIPE_API_KEY"] = ""
880 config["STRIPE_WEBHOOK_SECRET"] = ""
881 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
883 config["ENABLE_STRONG_VERIFICATION"] = False
884 config["IRIS_ID_PUBKEY"] = ""
885 config["IRIS_ID_SECRET"] = ""
886 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
887 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
888 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
889 )
891 config["SMTP_HOST"] = "localhost"
892 config["SMTP_PORT"] = 587
893 config["SMTP_USERNAME"] = "username"
894 config["SMTP_PASSWORD"] = "password"
896 config["ENABLE_MEDIA"] = True
897 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
898 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
899 )
900 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
901 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
902 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
904 config["BUG_TOOL_ENABLED"] = False
905 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
906 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
907 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
909 config["LISTMONK_ENABLED"] = False
910 config["LISTMONK_BASE_URL"] = "https://localhost"
911 config["LISTMONK_API_USERNAME"] = "..."
912 config["LISTMONK_API_KEY"] = "..."
913 config["LISTMONK_LIST_ID"] = 3
915 config["PUSH_NOTIFICATIONS_ENABLED"] = True
916 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
917 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
919 config["ACTIVENESS_PROBES_ENABLED"] = True
921 yield None
923 config.clear()
924 config.update(prevconfig)
927def run_migration_test():
928 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"
931@pytest.fixture
932def fast_passwords():
933 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
934 # make this fast by removing the hashing step
936 def fast_hash(password: bytes) -> bytes:
937 return b"fake hash:" + password
939 def fast_verify(hashed: bytes, password: bytes) -> bool:
940 return hashed == fast_hash(password)
942 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
943 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
944 yield
947def process_jobs():
948 while process_job():
949 pass
952@contextmanager
953def mock_notification_email():
954 with patch("couchers.email._queue_email") as mock:
955 yield mock
956 process_jobs()
959@dataclass
960class EmailData:
961 sender_name: str
962 sender_email: str
963 recipient: str
964 subject: str
965 plain: str
966 html: str
967 source_data: str
968 list_unsubscribe_header: str
971def email_fields(mock, call_ix=0):
972 _, kw = mock.call_args_list[call_ix]
973 return EmailData(
974 sender_name=kw.get("sender_name"),
975 sender_email=kw.get("sender_email"),
976 recipient=kw.get("recipient"),
977 subject=kw.get("subject"),
978 plain=kw.get("plain"),
979 html=kw.get("html"),
980 source_data=kw.get("source_data"),
981 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
982 )
985@pytest.fixture
986def push_collector():
987 """
988 See test_SendTestPushNotification for an example on how to use this fixture
989 """
991 class Push:
992 """
993 This allows nice access to the push info via e.g. push.title instead of push["title"]
994 """
996 def __init__(self, kwargs):
997 self.kwargs = kwargs
999 def __getattr__(self, attr):
1000 try:
1001 return self.kwargs[attr]
1002 except KeyError:
1003 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
1005 def __repr__(self):
1006 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
1007 return f"Push({kwargs_disp})"
1009 class PushCollector:
1010 def __init__(self):
1011 # pairs of (user_id, push)
1012 self.pushes = []
1014 def by_user(self, user_id):
1015 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
1017 def push_to_user(self, session, user_id, **kwargs):
1018 self.pushes.append((user_id, Push(kwargs=kwargs)))
1020 def assert_user_has_count(self, user_id, count):
1021 assert len(self.by_user(user_id)) == count
1023 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1024 push = self.by_user(user_id)[ix]
1025 for kwarg in kwargs:
1026 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1027 assert push.kwargs[kwarg] == kwargs[kwarg], (
1028 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1029 )
1031 def assert_user_has_single_matching(self, user_id, **kwargs):
1032 self.assert_user_has_count(user_id, 1)
1033 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1035 collector = PushCollector()
1037 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1038 yield collector