Coverage for src/tests/test_fixtures.py: 99%
548 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 20:25 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 20:25 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date, timedelta
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import (
20 CouchersMiddlewareInterceptor,
21 _try_get_and_update_user_details,
22)
23from couchers.jobs.worker import process_job
24from couchers.models import (
25 Base,
26 FriendRelationship,
27 FriendStatus,
28 HostingStatus,
29 Language,
30 LanguageAbility,
31 LanguageFluency,
32 MeetupStatus,
33 ModerationUserList,
34 PassportSex,
35 Region,
36 RegionLived,
37 RegionVisited,
38 StrongVerificationAttempt,
39 StrongVerificationAttemptStatus,
40 Upload,
41 User,
42 UserBlock,
43 UserSession,
44)
45from couchers.servicers.account import Account, Iris
46from couchers.servicers.admin import Admin
47from couchers.servicers.api import API
48from couchers.servicers.auth import Auth, create_session
49from couchers.servicers.blocking import Blocking
50from couchers.servicers.bugs import Bugs
51from couchers.servicers.communities import Communities
52from couchers.servicers.conversations import Conversations
53from couchers.servicers.discussions import Discussions
54from couchers.servicers.donations import Donations, Stripe
55from couchers.servicers.events import Events
56from couchers.servicers.gis import GIS
57from couchers.servicers.groups import Groups
58from couchers.servicers.jail import Jail
59from couchers.servicers.media import Media, get_media_auth_interceptor
60from couchers.servicers.notifications import Notifications
61from couchers.servicers.pages import Pages
62from couchers.servicers.public import Public
63from couchers.servicers.references import References
64from couchers.servicers.reporting import Reporting
65from couchers.servicers.requests import Requests
66from couchers.servicers.resources import Resources
67from couchers.servicers.search import Search
68from couchers.servicers.threads import Threads
69from couchers.sql import couchers_select as select
70from couchers.utils import create_coordinate, now
71from proto import (
72 account_pb2_grpc,
73 admin_pb2_grpc,
74 annotations_pb2,
75 api_pb2_grpc,
76 auth_pb2_grpc,
77 blocking_pb2_grpc,
78 bugs_pb2_grpc,
79 communities_pb2_grpc,
80 conversations_pb2_grpc,
81 discussions_pb2_grpc,
82 donations_pb2_grpc,
83 events_pb2_grpc,
84 gis_pb2_grpc,
85 groups_pb2_grpc,
86 iris_pb2_grpc,
87 jail_pb2_grpc,
88 media_pb2_grpc,
89 notifications_pb2_grpc,
90 pages_pb2_grpc,
91 public_pb2_grpc,
92 references_pb2_grpc,
93 reporting_pb2_grpc,
94 requests_pb2_grpc,
95 resources_pb2_grpc,
96 search_pb2_grpc,
97 stripe_pb2_grpc,
98 threads_pb2_grpc,
99)
102def truncate_all_tables():
103 """drop everything currently in the database"""
104 with session_scope() as session:
105 for table in Base.metadata.tables.values():
106 if table.name in ("regions", "languages", "timezone_areas"):
107 continue
108 name = f'"{table.schema}"."{table.name}"' if table.schema else f'"{table.name}"'
109 session.execute(text(f"TRUNCATE TABLE {name} RESTART IDENTITY CASCADE"))
111 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
112 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
113 # and similar errors
114 _get_base_engine().dispose()
116 close_all_sessions()
119def create_schema_from_models():
120 """
121 Create everything from the current models, not incrementally
122 through migrations.
123 """
125 # create sql functions (these are created in migrations otherwise)
126 functions = Path(__file__).parent / "sql_functions.sql"
127 with open(functions) as f, session_scope() as session:
128 session.execute(text(f.read()))
130 Base.metadata.create_all(_get_base_engine())
133def populate_testing_resources(session):
134 """
135 Testing version of couchers.resources.copy_resources_to_database
136 """
137 regions = [
138 ("AUS", "Australia"),
139 ("CAN", "Canada"),
140 ("CHE", "Switzerland"),
141 ("CUB", "Cuba"),
142 ("CXR", "Christmas Island"),
143 ("CZE", "Czechia"),
144 ("DEU", "Germany"),
145 ("EGY", "Egypt"),
146 ("ESP", "Spain"),
147 ("EST", "Estonia"),
148 ("FIN", "Finland"),
149 ("FRA", "France"),
150 ("GBR", "United Kingdom"),
151 ("GEO", "Georgia"),
152 ("GHA", "Ghana"),
153 ("GRC", "Greece"),
154 ("HKG", "Hong Kong"),
155 ("IRL", "Ireland"),
156 ("ISR", "Israel"),
157 ("ITA", "Italy"),
158 ("JPN", "Japan"),
159 ("LAO", "Laos"),
160 ("MEX", "Mexico"),
161 ("MMR", "Myanmar"),
162 ("NAM", "Namibia"),
163 ("NLD", "Netherlands"),
164 ("NZL", "New Zealand"),
165 ("POL", "Poland"),
166 ("PRK", "North Korea"),
167 ("REU", "Réunion"),
168 ("SGP", "Singapore"),
169 ("SWE", "Sweden"),
170 ("THA", "Thailand"),
171 ("TUR", "Turkey"),
172 ("TWN", "Taiwan"),
173 ("USA", "United States"),
174 ("VNM", "Vietnam"),
175 ]
177 languages = [
178 ("arb", "Arabic (Standard)"),
179 ("deu", "German"),
180 ("eng", "English"),
181 ("fin", "Finnish"),
182 ("fra", "French"),
183 ("heb", "Hebrew"),
184 ("hun", "Hungarian"),
185 ("jpn", "Japanese"),
186 ("pol", "Polish"),
187 ("swe", "Swedish"),
188 ("cmn", "Chinese (Mandarin)"),
189 ]
191 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
192 tz_sql = f.read()
194 for code, name in regions:
195 session.add(Region(code=code, name=name))
197 for code, name in languages:
198 session.add(Language(code=code, name=name))
200 session.execute(text(tz_sql))
203def drop_database() -> None:
204 with session_scope() as session:
205 # postgis is required for all the Geographic Information System (GIS) stuff
206 # pg_trgm is required for trigram-based search
207 # btree_gist is required for gist-based exclusion constraints
208 session.execute(
209 text(
210 "DROP SCHEMA IF EXISTS public CASCADE;"
211 "DROP SCHEMA IF EXISTS logging CASCADE;"
212 "DROP EXTENSION IF EXISTS postgis CASCADE;"
213 "CREATE SCHEMA public;"
214 "CREATE SCHEMA logging;"
215 "CREATE EXTENSION postgis;"
216 "CREATE EXTENSION pg_trgm;"
217 "CREATE EXTENSION btree_gist;"
218 )
219 )
222def recreate_database():
223 # running in non-UTC catches some timezone errors
224 os.environ["TZ"] = "America/New_York"
226 drop_database()
228 # create everything from the current models, not incrementally through migrations
229 create_schema_from_models()
231 with session_scope() as session:
232 populate_testing_resources(session)
235@pytest.fixture(scope="session")
236def create_database():
237 recreate_database()
240@pytest.fixture
241def db(create_database):
242 """
243 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
244 """
245 truncate_all_tables()
248def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs):
249 """
250 Create a new user, return session token
252 The user is detached from any session, and you can access its static attributes, but you can't modify it
254 Use this most of the time
255 """
256 auth = Auth()
258 with session_scope() as session:
259 # default args
260 username = "test_user_" + random_hex(16)
261 user_opts = {
262 "username": username,
263 "email": f"{username}@dev.couchers.org",
264 # password is just 'password'
265 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
266 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
267 "name": username.capitalize(),
268 "hosting_status": HostingStatus.cant_host,
269 "meetup_status": MeetupStatus.open_to_meetup,
270 "city": "Testing city",
271 "hometown": "Test hometown",
272 "community_standing": 0.5,
273 "birthdate": date(year=2000, month=1, day=1),
274 "gender": "Woman",
275 "pronouns": "",
276 "occupation": "Tester",
277 "education": "UST(esting)",
278 "about_me": "I test things",
279 "things_i_like": "Code",
280 "about_place": "My place has a lot of testing paraphenelia",
281 "additional_information": "I can be a bit testy",
282 # you need to make sure to update this logic to make sure the user is jailed/not on request
283 "accepted_tos": TOS_VERSION,
284 "accepted_community_guidelines": GUIDELINES_VERSION,
285 "geom": create_coordinate(40.7108, -73.9740),
286 "geom_radius": 100,
287 "onboarding_emails_sent": 1,
288 "last_onboarding_email_sent": now(),
289 "has_donated": True,
290 }
292 for key, value in kwargs.items():
293 user_opts[key] = value
295 user = User(**user_opts)
296 session.add(user)
297 session.flush()
299 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
300 session.add(RegionVisited(user_id=user.id, region_code="REU"))
301 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
303 session.add(RegionLived(user_id=user.id, region_code="ESP"))
304 session.add(RegionLived(user_id=user.id, region_code="FRA"))
305 session.add(RegionLived(user_id=user.id, region_code="EST"))
307 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
308 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
310 # this expires the user, so now it's "dirty"
311 session.commit()
313 class _MockCouchersContext:
314 @property
315 def headers(self):
316 return {}
318 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False)
320 # deleted user aborts session creation, hence this follows and necessitates a second commit
321 if delete_user:
322 user.is_deleted = True
324 user.recommendation_score = 1e10 - user.id
326 if complete_profile:
327 key = random_hex(32)
328 filename = random_hex(32) + ".jpg"
329 session.add(
330 Upload(
331 key=key,
332 filename=filename,
333 creator_user_id=user.id,
334 )
335 )
336 session.flush()
337 user.avatar_key = key
338 user.about_me = "I have a complete profile!\n" * 20
340 if strong_verification:
341 attempt = StrongVerificationAttempt(
342 verification_attempt_token=f"verification_attempt_token_{user.id}",
343 user_id=user.id,
344 status=StrongVerificationAttemptStatus.succeeded,
345 has_full_data=True,
346 passport_encrypted_data=b"not real",
347 passport_date_of_birth=user.birthdate,
348 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
349 user.gender, PassportSex.unspecified
350 ),
351 has_minimal_data=True,
352 passport_expiry_date=date.today() + timedelta(days=10),
353 passport_nationality="UTO",
354 passport_last_three_document_chars=f"{user.id:03}",
355 iris_token=f"iris_token_{user.id}",
356 iris_session_id=user.id,
357 )
358 session.add(attempt)
359 session.flush()
360 assert attempt.has_strong_verification(user)
362 session.commit()
364 assert user.has_completed_profile == complete_profile
366 # refresh it, undoes the expiry
367 session.refresh(user)
369 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
370 user.timezone # noqa: B018
372 # allows detaches the user from the session, allowing its use outside this session
373 session.expunge(user)
375 return user, token
378def get_user_id_and_token(session, username):
379 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
380 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
381 return user_id, token
384def make_friends(user1, user2):
385 with session_scope() as session:
386 friend_relationship = FriendRelationship(
387 from_user_id=user1.id,
388 to_user_id=user2.id,
389 status=FriendStatus.accepted,
390 )
391 session.add(friend_relationship)
394def make_user_block(user1, user2):
395 with session_scope() as session:
396 user_block = UserBlock(
397 blocking_user_id=user1.id,
398 blocked_user_id=user2.id,
399 )
400 session.add(user_block)
401 session.commit()
404def make_user_invisible(user_id):
405 with session_scope() as session:
406 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
409# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
410def get_friend_relationship(user1, user2):
411 with session_scope() as session:
412 friend_relationship = session.execute(
413 select(FriendRelationship).where(
414 or_(
415 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
416 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
417 )
418 )
419 ).scalar_one_or_none()
421 session.expunge(friend_relationship)
422 return friend_relationship
425def add_users_to_new_moderation_list(users):
426 """Group users as duplicated accounts"""
427 with session_scope() as session:
428 moderation_user_list = ModerationUserList()
429 session.add(moderation_user_list)
430 session.flush()
431 for user in users:
432 refreshed_user = session.get(User, user.id)
433 moderation_user_list.users.append(refreshed_user)
434 return moderation_user_list.id
437class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
438 """
439 Injects the right `cookie: couchers-sesh=...` header into the metadata
440 """
442 def __init__(self, token):
443 self.token = token
445 def __call__(self, context, callback):
446 callback((("cookie", f"couchers-sesh={self.token}"),), None)
449@contextmanager
450def auth_api_session(grpc_channel_options=()):
451 """
452 Create an Auth API for testing
454 This needs to use the real server since it plays around with headers
455 """
456 with futures.ThreadPoolExecutor(1) as executor:
457 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
458 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
459 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
460 server.start()
462 try:
463 with grpc.secure_channel(
464 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
465 ) as channel:
467 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
468 def __init__(self):
469 self.latest_headers = {}
471 def intercept_unary_unary(self, continuation, client_call_details, request):
472 call = continuation(client_call_details, request)
473 self.latest_headers = dict(call.initial_metadata())
474 self.latest_header_raw = call.initial_metadata()
475 return call
477 metadata_interceptor = _MetadataKeeperInterceptor()
478 channel = grpc.intercept_channel(channel, metadata_interceptor)
479 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
480 finally:
481 server.stop(None).wait()
484@contextmanager
485def api_session(token):
486 """
487 Create an API for testing, uses the token for auth
488 """
489 channel = fake_channel(token)
490 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
491 yield api_pb2_grpc.APIStub(channel)
494@contextmanager
495def real_api_session(token):
496 """
497 Create an API for testing, using TCP sockets, uses the token for auth
498 """
499 with futures.ThreadPoolExecutor(1) as executor:
500 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
501 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
502 api_pb2_grpc.add_APIServicer_to_server(API(), server)
503 server.start()
505 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
506 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
508 try:
509 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
510 yield api_pb2_grpc.APIStub(channel)
511 finally:
512 server.stop(None).wait()
515@contextmanager
516def real_admin_session(token):
517 """
518 Create a Admin service for testing, using TCP sockets, uses the token for auth
519 """
520 with futures.ThreadPoolExecutor(1) as executor:
521 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
522 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
523 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
524 server.start()
526 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
527 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
529 try:
530 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
531 yield admin_pb2_grpc.AdminStub(channel)
532 finally:
533 server.stop(None).wait()
536@contextmanager
537def real_account_session(token):
538 """
539 Create a Account service for testing, using TCP sockets, uses the token for auth
540 """
541 with futures.ThreadPoolExecutor(1) as executor:
542 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
543 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
544 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
545 server.start()
547 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
548 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
550 try:
551 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
552 yield account_pb2_grpc.AccountStub(channel)
553 finally:
554 server.stop(None).wait()
557@contextmanager
558def real_jail_session(token):
559 """
560 Create a Jail service for testing, using TCP sockets, uses the token for auth
561 """
562 with futures.ThreadPoolExecutor(1) as executor:
563 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
564 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
565 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
566 server.start()
568 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
569 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
571 try:
572 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
573 yield jail_pb2_grpc.JailStub(channel)
574 finally:
575 server.stop(None).wait()
578@contextmanager
579def gis_session(token):
580 channel = fake_channel(token)
581 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
582 yield gis_pb2_grpc.GISStub(channel)
585@contextmanager
586def public_session():
587 channel = fake_channel()
588 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
589 yield public_pb2_grpc.PublicStub(channel)
592class FakeRpcError(grpc.RpcError):
593 def __init__(self, code, details):
594 self._code = code
595 self._details = details
597 def code(self):
598 return self._code
600 def details(self):
601 return self._details
604def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
605 # method is of the form "/org.couchers.api.core.API/GetUser"
606 _, service_name, method_name = method.split("/")
608 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
609 auth_level = service_options.Extensions[annotations_pb2.auth_level]
610 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
611 assert auth_level in [
612 annotations_pb2.AUTH_LEVEL_OPEN,
613 annotations_pb2.AUTH_LEVEL_JAILED,
614 annotations_pb2.AUTH_LEVEL_SECURE,
615 annotations_pb2.AUTH_LEVEL_ADMIN,
616 ]
618 if not user_id:
619 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
620 else:
621 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
622 "Non-superuser tried to call superuser API"
623 )
624 assert not (
625 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
626 ), "User is jailed but tried to call non-open/non-jailed API"
629class FakeChannel:
630 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
631 self.handlers = {}
632 self.user_id = user_id
633 self._is_jailed = is_jailed
634 self._is_superuser = is_superuser
635 self._token_expiry = token_expiry
637 def is_logged_in(self):
638 return self.user_id is not None
640 def abort(self, code, details):
641 raise FakeRpcError(code, details)
643 def add_generic_rpc_handlers(self, generic_rpc_handlers):
644 from grpc._server import _validate_generic_rpc_handlers
646 _validate_generic_rpc_handlers(generic_rpc_handlers)
648 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
650 def unary_unary(self, uri, request_serializer, response_deserializer):
651 handler = self.handlers[uri]
653 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
655 def fake_handler(request):
656 # Do a full serialization cycle on the request and the
657 # response to catch accidental use of unserializable data.
658 request = handler.request_deserializer(request_serializer(request))
660 with session_scope() as session:
661 response = handler.unary_unary(request, self, session)
663 return response_deserializer(handler.response_serializer(response))
665 return fake_handler
668def fake_channel(token=None):
669 if token:
670 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details(
671 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
672 )
673 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
674 return FakeChannel()
677@contextmanager
678def conversations_session(token):
679 """
680 Create a Conversations API for testing, uses the token for auth
681 """
682 channel = fake_channel(token)
683 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
684 yield conversations_pb2_grpc.ConversationsStub(channel)
687@contextmanager
688def requests_session(token):
689 """
690 Create a Requests API for testing, uses the token for auth
691 """
692 channel = fake_channel(token)
693 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
694 yield requests_pb2_grpc.RequestsStub(channel)
697@contextmanager
698def threads_session(token):
699 channel = fake_channel(token)
700 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
701 yield threads_pb2_grpc.ThreadsStub(channel)
704@contextmanager
705def discussions_session(token):
706 channel = fake_channel(token)
707 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
708 yield discussions_pb2_grpc.DiscussionsStub(channel)
711@contextmanager
712def donations_session(token):
713 channel = fake_channel(token)
714 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
715 yield donations_pb2_grpc.DonationsStub(channel)
718@contextmanager
719def real_stripe_session():
720 """
721 Create a Stripe service for testing, using TCP sockets
722 """
723 with futures.ThreadPoolExecutor(1) as executor:
724 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
725 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
726 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
727 server.start()
729 creds = grpc.local_channel_credentials()
731 try:
732 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
733 yield stripe_pb2_grpc.StripeStub(channel)
734 finally:
735 server.stop(None).wait()
738@contextmanager
739def real_iris_session():
740 with futures.ThreadPoolExecutor(1) as executor:
741 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
742 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
743 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
744 server.start()
746 creds = grpc.local_channel_credentials()
748 try:
749 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
750 yield iris_pb2_grpc.IrisStub(channel)
751 finally:
752 server.stop(None).wait()
755@contextmanager
756def pages_session(token):
757 channel = fake_channel(token)
758 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
759 yield pages_pb2_grpc.PagesStub(channel)
762@contextmanager
763def communities_session(token):
764 channel = fake_channel(token)
765 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
766 yield communities_pb2_grpc.CommunitiesStub(channel)
769@contextmanager
770def groups_session(token):
771 channel = fake_channel(token)
772 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
773 yield groups_pb2_grpc.GroupsStub(channel)
776@contextmanager
777def blocking_session(token):
778 channel = fake_channel(token)
779 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
780 yield blocking_pb2_grpc.BlockingStub(channel)
783@contextmanager
784def notifications_session(token):
785 channel = fake_channel(token)
786 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
787 yield notifications_pb2_grpc.NotificationsStub(channel)
790@contextmanager
791def account_session(token):
792 """
793 Create a Account API for testing, uses the token for auth
794 """
795 channel = fake_channel(token)
796 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
797 yield account_pb2_grpc.AccountStub(channel)
800@contextmanager
801def search_session(token):
802 """
803 Create a Search API for testing, uses the token for auth
804 """
805 channel = fake_channel(token)
806 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
807 yield search_pb2_grpc.SearchStub(channel)
810@contextmanager
811def references_session(token):
812 """
813 Create a References API for testing, uses the token for auth
814 """
815 channel = fake_channel(token)
816 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
817 yield references_pb2_grpc.ReferencesStub(channel)
820@contextmanager
821def reporting_session(token):
822 channel = fake_channel(token)
823 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
824 yield reporting_pb2_grpc.ReportingStub(channel)
827@contextmanager
828def events_session(token):
829 channel = fake_channel(token)
830 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
831 yield events_pb2_grpc.EventsStub(channel)
834@contextmanager
835def bugs_session(token=None):
836 channel = fake_channel(token)
837 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
838 yield bugs_pb2_grpc.BugsStub(channel)
841@contextmanager
842def resources_session():
843 channel = fake_channel()
844 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
845 yield resources_pb2_grpc.ResourcesStub(channel)
848@contextmanager
849def media_session(bearer_token):
850 """
851 Create a fresh Media API for testing, uses the bearer token for media auth
852 """
853 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
855 with futures.ThreadPoolExecutor(1) as executor:
856 server = grpc.server(executor, interceptors=[media_auth_interceptor])
857 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
858 servicer = Media()
859 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
860 server.start()
862 call_creds = grpc.access_token_call_credentials(bearer_token)
863 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
865 try:
866 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
867 yield media_pb2_grpc.MediaStub(channel)
868 finally:
869 server.stop(None).wait()
872@pytest.fixture(scope="class")
873def testconfig():
874 prevconfig = config.copy()
875 config.clear()
876 config.update(prevconfig)
878 config["IN_TEST"] = True
880 config["DEV"] = True
881 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
882 config["VERSION"] = "testing_version"
883 config["BASE_URL"] = "http://localhost:3000"
884 config["BACKEND_BASE_URL"] = "http://localhost:8888"
885 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
886 config["COOKIE_DOMAIN"] = "localhost"
888 config["ENABLE_SMS"] = False
889 config["SMS_SENDER_ID"] = "invalid"
891 config["ENABLE_EMAIL"] = False
892 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
893 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
894 config["NOTIFICATION_PREFIX"] = "[TEST] "
895 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
896 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
897 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
899 config["ENABLE_DONATIONS"] = False
900 config["STRIPE_API_KEY"] = ""
901 config["STRIPE_WEBHOOK_SECRET"] = ""
902 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
904 config["ENABLE_STRONG_VERIFICATION"] = False
905 config["IRIS_ID_PUBKEY"] = ""
906 config["IRIS_ID_SECRET"] = ""
907 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
908 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
909 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
910 )
912 config["SMTP_HOST"] = "localhost"
913 config["SMTP_PORT"] = 587
914 config["SMTP_USERNAME"] = "username"
915 config["SMTP_PASSWORD"] = "password"
917 config["ENABLE_MEDIA"] = True
918 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
919 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
920 )
921 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
922 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
923 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
925 config["BUG_TOOL_ENABLED"] = False
926 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
927 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
928 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
930 config["LISTMONK_ENABLED"] = False
931 config["LISTMONK_BASE_URL"] = "https://localhost"
932 config["LISTMONK_API_USERNAME"] = "..."
933 config["LISTMONK_API_KEY"] = "..."
934 config["LISTMONK_LIST_ID"] = 3
936 config["PUSH_NOTIFICATIONS_ENABLED"] = True
937 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
938 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
940 config["ACTIVENESS_PROBES_ENABLED"] = True
942 config["RECAPTHCA_ENABLED"] = False
943 config["RECAPTHCA_PROJECT_ID"] = "..."
944 config["RECAPTHCA_API_KEY"] = "..."
945 config["RECAPTHCA_SITE_KEY"] = "..."
947 yield None
949 config.clear()
950 config.update(prevconfig)
953def run_migration_test():
954 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"
957@pytest.fixture
958def fast_passwords():
959 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
960 # make this fast by removing the hashing step
962 def fast_hash(password: bytes) -> bytes:
963 return b"fake hash:" + password
965 def fast_verify(hashed: bytes, password: bytes) -> bool:
966 return hashed == fast_hash(password)
968 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
969 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
970 yield
973def process_jobs():
974 while process_job():
975 pass
978@contextmanager
979def mock_notification_email():
980 with patch("couchers.email._queue_email") as mock:
981 yield mock
982 process_jobs()
985@dataclass
986class EmailData:
987 sender_name: str
988 sender_email: str
989 recipient: str
990 subject: str
991 plain: str
992 html: str
993 source_data: str
994 list_unsubscribe_header: str
997def email_fields(mock, call_ix=0):
998 _, kw = mock.call_args_list[call_ix]
999 return EmailData(
1000 sender_name=kw.get("sender_name"),
1001 sender_email=kw.get("sender_email"),
1002 recipient=kw.get("recipient"),
1003 subject=kw.get("subject"),
1004 plain=kw.get("plain"),
1005 html=kw.get("html"),
1006 source_data=kw.get("source_data"),
1007 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
1008 )
1011@pytest.fixture
1012def push_collector():
1013 """
1014 See test_SendTestPushNotification for an example on how to use this fixture
1015 """
1017 class Push:
1018 """
1019 This allows nice access to the push info via e.g. push.title instead of push["title"]
1020 """
1022 def __init__(self, kwargs):
1023 self.kwargs = kwargs
1025 def __getattr__(self, attr):
1026 try:
1027 return self.kwargs[attr]
1028 except KeyError:
1029 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
1031 def __repr__(self):
1032 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
1033 return f"Push({kwargs_disp})"
1035 class PushCollector:
1036 def __init__(self):
1037 # pairs of (user_id, push)
1038 self.pushes = []
1040 def by_user(self, user_id):
1041 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
1043 def push_to_user(self, session, user_id, **kwargs):
1044 self.pushes.append((user_id, Push(kwargs=kwargs)))
1046 def assert_user_has_count(self, user_id, count):
1047 assert len(self.by_user(user_id)) == count
1049 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1050 push = self.by_user(user_id)[ix]
1051 for kwarg in kwargs:
1052 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1053 assert push.kwargs[kwarg] == kwargs[kwarg], (
1054 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1055 )
1057 def assert_user_has_single_matching(self, user_id, **kwargs):
1058 self.assert_user_has_count(user_id, 1)
1059 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1061 collector = PushCollector()
1063 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1064 yield collector