Coverage for src/tests/test_fixtures.py: 99%
538 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date, timedelta
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import (
20 CouchersMiddlewareInterceptor,
21 _try_get_and_update_user_details,
22)
23from couchers.jobs.worker import process_job
24from couchers.models import (
25 Base,
26 FriendRelationship,
27 FriendStatus,
28 HostingStatus,
29 Language,
30 LanguageAbility,
31 LanguageFluency,
32 MeetupStatus,
33 ModerationUserList,
34 PassportSex,
35 Region,
36 RegionLived,
37 RegionVisited,
38 StrongVerificationAttempt,
39 StrongVerificationAttemptStatus,
40 Upload,
41 User,
42 UserBlock,
43 UserSession,
44)
45from couchers.servicers.account import Account, Iris
46from couchers.servicers.admin import Admin
47from couchers.servicers.api import API
48from couchers.servicers.auth import Auth, create_session
49from couchers.servicers.blocking import Blocking
50from couchers.servicers.bugs import Bugs
51from couchers.servicers.communities import Communities
52from couchers.servicers.conversations import Conversations
53from couchers.servicers.discussions import Discussions
54from couchers.servicers.donations import Donations, Stripe
55from couchers.servicers.events import Events
56from couchers.servicers.gis import GIS
57from couchers.servicers.groups import Groups
58from couchers.servicers.jail import Jail
59from couchers.servicers.media import Media, get_media_auth_interceptor
60from couchers.servicers.notifications import Notifications
61from couchers.servicers.pages import Pages
62from couchers.servicers.public import Public
63from couchers.servicers.references import References
64from couchers.servicers.reporting import Reporting
65from couchers.servicers.requests import Requests
66from couchers.servicers.resources import Resources
67from couchers.servicers.search import Search
68from couchers.servicers.threads import Threads
69from couchers.sql import couchers_select as select
70from couchers.utils import create_coordinate, now
71from proto import (
72 account_pb2_grpc,
73 admin_pb2_grpc,
74 annotations_pb2,
75 api_pb2_grpc,
76 auth_pb2_grpc,
77 blocking_pb2_grpc,
78 bugs_pb2_grpc,
79 communities_pb2_grpc,
80 conversations_pb2_grpc,
81 discussions_pb2_grpc,
82 donations_pb2_grpc,
83 events_pb2_grpc,
84 gis_pb2_grpc,
85 groups_pb2_grpc,
86 iris_pb2_grpc,
87 jail_pb2_grpc,
88 media_pb2_grpc,
89 notifications_pb2_grpc,
90 pages_pb2_grpc,
91 public_pb2_grpc,
92 references_pb2_grpc,
93 reporting_pb2_grpc,
94 requests_pb2_grpc,
95 resources_pb2_grpc,
96 search_pb2_grpc,
97 stripe_pb2_grpc,
98 threads_pb2_grpc,
99)
102def drop_all():
103 """drop everything currently in the database"""
104 with session_scope() as session:
105 # postgis is required for all the Geographic Information System (GIS) stuff
106 # pg_trgm is required for trigram based search
107 # btree_gist is required for gist-based exclusion constraints
108 session.execute(
109 text(
110 "DROP SCHEMA IF EXISTS public CASCADE;"
111 "DROP SCHEMA IF EXISTS logging CASCADE;"
112 "DROP EXTENSION IF EXISTS postgis CASCADE;"
113 "CREATE SCHEMA public;"
114 "CREATE SCHEMA logging;"
115 "CREATE EXTENSION postgis;"
116 "CREATE EXTENSION pg_trgm;"
117 "CREATE EXTENSION btree_gist;"
118 )
119 )
121 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
122 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
123 # and similar errors
124 _get_base_engine().dispose()
126 close_all_sessions()
129def create_schema_from_models():
130 """
131 Create everything from the current models, not incrementally
132 through migrations.
133 """
135 # create the slugify function
136 functions = Path(__file__).parent / "slugify.sql"
137 with open(functions) as f, session_scope() as session:
138 session.execute(text(f.read()))
140 Base.metadata.create_all(_get_base_engine())
143def populate_testing_resources(session):
144 """
145 Testing version of couchers.resources.copy_resources_to_database
146 """
147 regions = [
148 ("AUS", "Australia"),
149 ("CAN", "Canada"),
150 ("CHE", "Switzerland"),
151 ("CUB", "Cuba"),
152 ("CXR", "Christmas Island"),
153 ("CZE", "Czechia"),
154 ("DEU", "Germany"),
155 ("EGY", "Egypt"),
156 ("ESP", "Spain"),
157 ("EST", "Estonia"),
158 ("FIN", "Finland"),
159 ("FRA", "France"),
160 ("GBR", "United Kingdom"),
161 ("GEO", "Georgia"),
162 ("GHA", "Ghana"),
163 ("GRC", "Greece"),
164 ("HKG", "Hong Kong"),
165 ("IRL", "Ireland"),
166 ("ISR", "Israel"),
167 ("ITA", "Italy"),
168 ("JPN", "Japan"),
169 ("LAO", "Laos"),
170 ("MEX", "Mexico"),
171 ("MMR", "Myanmar"),
172 ("NAM", "Namibia"),
173 ("NLD", "Netherlands"),
174 ("NZL", "New Zealand"),
175 ("POL", "Poland"),
176 ("PRK", "North Korea"),
177 ("REU", "Réunion"),
178 ("SGP", "Singapore"),
179 ("SWE", "Sweden"),
180 ("THA", "Thailand"),
181 ("TUR", "Turkey"),
182 ("TWN", "Taiwan"),
183 ("USA", "United States"),
184 ("VNM", "Vietnam"),
185 ]
187 languages = [
188 ("arb", "Arabic (Standard)"),
189 ("deu", "German"),
190 ("eng", "English"),
191 ("fin", "Finnish"),
192 ("fra", "French"),
193 ("heb", "Hebrew"),
194 ("hun", "Hungarian"),
195 ("jpn", "Japanese"),
196 ("pol", "Polish"),
197 ("swe", "Swedish"),
198 ("cmn", "Chinese (Mandarin)"),
199 ]
201 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
202 tz_sql = f.read()
204 for code, name in regions:
205 session.add(Region(code=code, name=name))
207 for code, name in languages:
208 session.add(Language(code=code, name=name))
210 session.execute(text(tz_sql))
213def recreate_database():
214 """
215 Connect to a running Postgres database, build it using metadata.create_all()
216 """
218 # running in non-UTC catches some timezone errors
219 os.environ["TZ"] = "America/New_York"
221 # drop everything currently in the database
222 drop_all()
224 # create everything from the current models, not incrementally through migrations
225 create_schema_from_models()
227 with session_scope() as session:
228 populate_testing_resources(session)
231@pytest.fixture()
232def db():
233 """
234 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
235 """
237 recreate_database()
240def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs):
241 """
242 Create a new user, return session token
244 The user is detached from any session, and you can access its static attributes, but you can't modify it
246 Use this most of the time
247 """
248 auth = Auth()
250 with session_scope() as session:
251 # default args
252 username = "test_user_" + random_hex(16)
253 user_opts = {
254 "username": username,
255 "email": f"{username}@dev.couchers.org",
256 # password is just 'password'
257 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
258 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
259 "name": username.capitalize(),
260 "hosting_status": HostingStatus.cant_host,
261 "meetup_status": MeetupStatus.open_to_meetup,
262 "city": "Testing city",
263 "hometown": "Test hometown",
264 "community_standing": 0.5,
265 "birthdate": date(year=2000, month=1, day=1),
266 "gender": "Woman",
267 "pronouns": "",
268 "occupation": "Tester",
269 "education": "UST(esting)",
270 "about_me": "I test things",
271 "things_i_like": "Code",
272 "about_place": "My place has a lot of testing paraphenelia",
273 "additional_information": "I can be a bit testy",
274 # you need to make sure to update this logic to make sure the user is jailed/not on request
275 "accepted_tos": TOS_VERSION,
276 "accepted_community_guidelines": GUIDELINES_VERSION,
277 "geom": create_coordinate(40.7108, -73.9740),
278 "geom_radius": 100,
279 "onboarding_emails_sent": 1,
280 "last_onboarding_email_sent": now(),
281 "has_donated": True,
282 }
284 for key, value in kwargs.items():
285 user_opts[key] = value
287 user = User(**user_opts)
288 session.add(user)
289 session.flush()
291 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
292 session.add(RegionVisited(user_id=user.id, region_code="REU"))
293 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
295 session.add(RegionLived(user_id=user.id, region_code="ESP"))
296 session.add(RegionLived(user_id=user.id, region_code="FRA"))
297 session.add(RegionLived(user_id=user.id, region_code="EST"))
299 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
300 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
302 # this expires the user, so now it's "dirty"
303 session.commit()
305 class _MockCouchersContext:
306 @property
307 def headers(self):
308 return {}
310 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False)
312 # deleted user aborts session creation, hence this follows and necessitates a second commit
313 if delete_user:
314 user.is_deleted = True
316 user.recommendation_score = 1e10 - user.id
318 if complete_profile:
319 key = random_hex(32)
320 filename = random_hex(32) + ".jpg"
321 session.add(
322 Upload(
323 key=key,
324 filename=filename,
325 creator_user_id=user.id,
326 )
327 )
328 session.flush()
329 user.avatar_key = key
330 user.about_me = "I have a complete profile!\n" * 20
332 if strong_verification:
333 attempt = StrongVerificationAttempt(
334 verification_attempt_token=f"verification_attempt_token_{user.id}",
335 user_id=user.id,
336 status=StrongVerificationAttemptStatus.succeeded,
337 has_full_data=True,
338 passport_encrypted_data=b"not real",
339 passport_date_of_birth=user.birthdate,
340 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
341 user.gender, PassportSex.unspecified
342 ),
343 has_minimal_data=True,
344 passport_expiry_date=date.today() + timedelta(days=10),
345 passport_nationality="UTO",
346 passport_last_three_document_chars=f"{user.id:03}",
347 iris_token=f"iris_token_{user.id}",
348 iris_session_id=user.id,
349 )
350 session.add(attempt)
351 session.flush()
352 assert attempt.has_strong_verification(user)
354 session.commit()
356 assert user.has_completed_profile == complete_profile
358 # refresh it, undoes the expiry
359 session.refresh(user)
361 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
362 user.timezone # noqa: B018
364 # allows detaches the user from the session, allowing its use outside this session
365 session.expunge(user)
367 return user, token
370def get_user_id_and_token(session, username):
371 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
372 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
373 return user_id, token
376def make_friends(user1, user2):
377 with session_scope() as session:
378 friend_relationship = FriendRelationship(
379 from_user_id=user1.id,
380 to_user_id=user2.id,
381 status=FriendStatus.accepted,
382 )
383 session.add(friend_relationship)
386def make_user_block(user1, user2):
387 with session_scope() as session:
388 user_block = UserBlock(
389 blocking_user_id=user1.id,
390 blocked_user_id=user2.id,
391 )
392 session.add(user_block)
393 session.commit()
396def make_user_invisible(user_id):
397 with session_scope() as session:
398 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
401# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
402def get_friend_relationship(user1, user2):
403 with session_scope() as session:
404 friend_relationship = session.execute(
405 select(FriendRelationship).where(
406 or_(
407 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
408 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
409 )
410 )
411 ).scalar_one_or_none()
413 session.expunge(friend_relationship)
414 return friend_relationship
417def add_users_to_new_moderation_list(users):
418 """Group users as duplicated accounts"""
419 with session_scope() as session:
420 moderation_user_list = ModerationUserList()
421 session.add(moderation_user_list)
422 session.flush()
423 for user in users:
424 refreshed_user = session.get(User, user.id)
425 moderation_user_list.users.append(refreshed_user)
426 return moderation_user_list.id
429class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
430 """
431 Injects the right `cookie: couchers-sesh=...` header into the metadata
432 """
434 def __init__(self, token):
435 self.token = token
437 def __call__(self, context, callback):
438 callback((("cookie", f"couchers-sesh={self.token}"),), None)
441@contextmanager
442def auth_api_session(grpc_channel_options=()):
443 """
444 Create an Auth API for testing
446 This needs to use the real server since it plays around with headers
447 """
448 with futures.ThreadPoolExecutor(1) as executor:
449 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
450 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
451 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
452 server.start()
454 try:
455 with grpc.secure_channel(
456 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
457 ) as channel:
459 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
460 def __init__(self):
461 self.latest_headers = {}
463 def intercept_unary_unary(self, continuation, client_call_details, request):
464 call = continuation(client_call_details, request)
465 self.latest_headers = dict(call.initial_metadata())
466 self.latest_header_raw = call.initial_metadata()
467 return call
469 metadata_interceptor = _MetadataKeeperInterceptor()
470 channel = grpc.intercept_channel(channel, metadata_interceptor)
471 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
472 finally:
473 server.stop(None).wait()
476@contextmanager
477def api_session(token):
478 """
479 Create an API for testing, uses the token for auth
480 """
481 channel = fake_channel(token)
482 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
483 yield api_pb2_grpc.APIStub(channel)
486@contextmanager
487def real_api_session(token):
488 """
489 Create an API for testing, using TCP sockets, uses the token for auth
490 """
491 with futures.ThreadPoolExecutor(1) as executor:
492 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
493 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
494 api_pb2_grpc.add_APIServicer_to_server(API(), server)
495 server.start()
497 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
498 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
500 try:
501 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
502 yield api_pb2_grpc.APIStub(channel)
503 finally:
504 server.stop(None).wait()
507@contextmanager
508def real_admin_session(token):
509 """
510 Create a Admin service for testing, using TCP sockets, uses the token for auth
511 """
512 with futures.ThreadPoolExecutor(1) as executor:
513 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
514 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
515 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
516 server.start()
518 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
519 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
521 try:
522 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
523 yield admin_pb2_grpc.AdminStub(channel)
524 finally:
525 server.stop(None).wait()
528@contextmanager
529def real_account_session(token):
530 """
531 Create a Account service for testing, using TCP sockets, uses the token for auth
532 """
533 with futures.ThreadPoolExecutor(1) as executor:
534 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
535 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
536 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
537 server.start()
539 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
540 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
542 try:
543 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
544 yield account_pb2_grpc.AccountStub(channel)
545 finally:
546 server.stop(None).wait()
549@contextmanager
550def real_jail_session(token):
551 """
552 Create a Jail service for testing, using TCP sockets, uses the token for auth
553 """
554 with futures.ThreadPoolExecutor(1) as executor:
555 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
556 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
557 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
558 server.start()
560 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
561 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
563 try:
564 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
565 yield jail_pb2_grpc.JailStub(channel)
566 finally:
567 server.stop(None).wait()
570@contextmanager
571def gis_session(token):
572 channel = fake_channel(token)
573 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
574 yield gis_pb2_grpc.GISStub(channel)
577@contextmanager
578def public_session():
579 channel = fake_channel()
580 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
581 yield public_pb2_grpc.PublicStub(channel)
584class FakeRpcError(grpc.RpcError):
585 def __init__(self, code, details):
586 self._code = code
587 self._details = details
589 def code(self):
590 return self._code
592 def details(self):
593 return self._details
596def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
597 # method is of the form "/org.couchers.api.core.API/GetUser"
598 _, service_name, method_name = method.split("/")
600 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
601 auth_level = service_options.Extensions[annotations_pb2.auth_level]
602 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
603 assert auth_level in [
604 annotations_pb2.AUTH_LEVEL_OPEN,
605 annotations_pb2.AUTH_LEVEL_JAILED,
606 annotations_pb2.AUTH_LEVEL_SECURE,
607 annotations_pb2.AUTH_LEVEL_ADMIN,
608 ]
610 if not user_id:
611 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
612 else:
613 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
614 "Non-superuser tried to call superuser API"
615 )
616 assert not (
617 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
618 ), "User is jailed but tried to call non-open/non-jailed API"
621class FakeChannel:
622 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
623 self.handlers = {}
624 self.user_id = user_id
625 self._is_jailed = is_jailed
626 self._is_superuser = is_superuser
627 self._token_expiry = token_expiry
629 def is_logged_in(self):
630 return self.user_id is not None
632 def abort(self, code, details):
633 raise FakeRpcError(code, details)
635 def add_generic_rpc_handlers(self, generic_rpc_handlers):
636 from grpc._server import _validate_generic_rpc_handlers
638 _validate_generic_rpc_handlers(generic_rpc_handlers)
640 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
642 def unary_unary(self, uri, request_serializer, response_deserializer):
643 handler = self.handlers[uri]
645 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
647 def fake_handler(request):
648 # Do a full serialization cycle on the request and the
649 # response to catch accidental use of unserializable data.
650 request = handler.request_deserializer(request_serializer(request))
652 with session_scope() as session:
653 response = handler.unary_unary(request, self, session)
655 return response_deserializer(handler.response_serializer(response))
657 return fake_handler
660def fake_channel(token=None):
661 if token:
662 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details(
663 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
664 )
665 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
666 return FakeChannel()
669@contextmanager
670def conversations_session(token):
671 """
672 Create a Conversations API for testing, uses the token for auth
673 """
674 channel = fake_channel(token)
675 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
676 yield conversations_pb2_grpc.ConversationsStub(channel)
679@contextmanager
680def requests_session(token):
681 """
682 Create a Requests API for testing, uses the token for auth
683 """
684 channel = fake_channel(token)
685 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
686 yield requests_pb2_grpc.RequestsStub(channel)
689@contextmanager
690def threads_session(token):
691 channel = fake_channel(token)
692 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
693 yield threads_pb2_grpc.ThreadsStub(channel)
696@contextmanager
697def discussions_session(token):
698 channel = fake_channel(token)
699 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
700 yield discussions_pb2_grpc.DiscussionsStub(channel)
703@contextmanager
704def donations_session(token):
705 channel = fake_channel(token)
706 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
707 yield donations_pb2_grpc.DonationsStub(channel)
710@contextmanager
711def real_stripe_session():
712 """
713 Create a Stripe service for testing, using TCP sockets
714 """
715 with futures.ThreadPoolExecutor(1) as executor:
716 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
717 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
718 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
719 server.start()
721 creds = grpc.local_channel_credentials()
723 try:
724 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
725 yield stripe_pb2_grpc.StripeStub(channel)
726 finally:
727 server.stop(None).wait()
730@contextmanager
731def real_iris_session():
732 with futures.ThreadPoolExecutor(1) as executor:
733 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
734 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
735 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
736 server.start()
738 creds = grpc.local_channel_credentials()
740 try:
741 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
742 yield iris_pb2_grpc.IrisStub(channel)
743 finally:
744 server.stop(None).wait()
747@contextmanager
748def pages_session(token):
749 channel = fake_channel(token)
750 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
751 yield pages_pb2_grpc.PagesStub(channel)
754@contextmanager
755def communities_session(token):
756 channel = fake_channel(token)
757 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
758 yield communities_pb2_grpc.CommunitiesStub(channel)
761@contextmanager
762def groups_session(token):
763 channel = fake_channel(token)
764 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
765 yield groups_pb2_grpc.GroupsStub(channel)
768@contextmanager
769def blocking_session(token):
770 channel = fake_channel(token)
771 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
772 yield blocking_pb2_grpc.BlockingStub(channel)
775@contextmanager
776def notifications_session(token):
777 channel = fake_channel(token)
778 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
779 yield notifications_pb2_grpc.NotificationsStub(channel)
782@contextmanager
783def account_session(token):
784 """
785 Create a Account API for testing, uses the token for auth
786 """
787 channel = fake_channel(token)
788 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
789 yield account_pb2_grpc.AccountStub(channel)
792@contextmanager
793def search_session(token):
794 """
795 Create a Search API for testing, uses the token for auth
796 """
797 channel = fake_channel(token)
798 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
799 yield search_pb2_grpc.SearchStub(channel)
802@contextmanager
803def references_session(token):
804 """
805 Create a References API for testing, uses the token for auth
806 """
807 channel = fake_channel(token)
808 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
809 yield references_pb2_grpc.ReferencesStub(channel)
812@contextmanager
813def reporting_session(token):
814 channel = fake_channel(token)
815 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
816 yield reporting_pb2_grpc.ReportingStub(channel)
819@contextmanager
820def events_session(token):
821 channel = fake_channel(token)
822 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
823 yield events_pb2_grpc.EventsStub(channel)
826@contextmanager
827def bugs_session(token=None):
828 channel = fake_channel(token)
829 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
830 yield bugs_pb2_grpc.BugsStub(channel)
833@contextmanager
834def resources_session():
835 channel = fake_channel()
836 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
837 yield resources_pb2_grpc.ResourcesStub(channel)
840@contextmanager
841def media_session(bearer_token):
842 """
843 Create a fresh Media API for testing, uses the bearer token for media auth
844 """
845 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
847 with futures.ThreadPoolExecutor(1) as executor:
848 server = grpc.server(executor, interceptors=[media_auth_interceptor])
849 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
850 servicer = Media()
851 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
852 server.start()
854 call_creds = grpc.access_token_call_credentials(bearer_token)
855 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
857 try:
858 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
859 yield media_pb2_grpc.MediaStub(channel)
860 finally:
861 server.stop(None).wait()
864@pytest.fixture(scope="class")
865def testconfig():
866 prevconfig = config.copy()
867 config.clear()
868 config.update(prevconfig)
870 config["IN_TEST"] = True
872 config["DEV"] = True
873 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
874 config["VERSION"] = "testing_version"
875 config["BASE_URL"] = "http://localhost:3000"
876 config["BACKEND_BASE_URL"] = "http://localhost:8888"
877 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
878 config["COOKIE_DOMAIN"] = "localhost"
880 config["ENABLE_SMS"] = False
881 config["SMS_SENDER_ID"] = "invalid"
883 config["ENABLE_EMAIL"] = False
884 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
885 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
886 config["NOTIFICATION_PREFIX"] = "[TEST] "
887 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
888 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
889 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
891 config["ENABLE_DONATIONS"] = False
892 config["STRIPE_API_KEY"] = ""
893 config["STRIPE_WEBHOOK_SECRET"] = ""
894 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
896 config["ENABLE_STRONG_VERIFICATION"] = False
897 config["IRIS_ID_PUBKEY"] = ""
898 config["IRIS_ID_SECRET"] = ""
899 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
900 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
901 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
902 )
904 config["SMTP_HOST"] = "localhost"
905 config["SMTP_PORT"] = 587
906 config["SMTP_USERNAME"] = "username"
907 config["SMTP_PASSWORD"] = "password"
909 config["ENABLE_MEDIA"] = True
910 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
911 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
912 )
913 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
914 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
915 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
917 config["BUG_TOOL_ENABLED"] = False
918 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
919 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
920 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
922 config["LISTMONK_ENABLED"] = False
923 config["LISTMONK_BASE_URL"] = "https://localhost"
924 config["LISTMONK_API_USERNAME"] = "..."
925 config["LISTMONK_API_KEY"] = "..."
926 config["LISTMONK_LIST_ID"] = 3
928 config["PUSH_NOTIFICATIONS_ENABLED"] = True
929 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
930 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
932 config["ACTIVENESS_PROBES_ENABLED"] = True
934 config["RECAPTHCA_ENABLED"] = False
935 config["RECAPTHCA_PROJECT_ID"] = "..."
936 config["RECAPTHCA_API_KEY"] = "..."
937 config["RECAPTHCA_SITE_KEY"] = "..."
939 yield None
941 config.clear()
942 config.update(prevconfig)
945def run_migration_test():
946 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"
949@pytest.fixture
950def fast_passwords():
951 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
952 # make this fast by removing the hashing step
954 def fast_hash(password: bytes) -> bytes:
955 return b"fake hash:" + password
957 def fast_verify(hashed: bytes, password: bytes) -> bool:
958 return hashed == fast_hash(password)
960 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
961 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
962 yield
965def process_jobs():
966 while process_job():
967 pass
970@contextmanager
971def mock_notification_email():
972 with patch("couchers.email._queue_email") as mock:
973 yield mock
974 process_jobs()
977@dataclass
978class EmailData:
979 sender_name: str
980 sender_email: str
981 recipient: str
982 subject: str
983 plain: str
984 html: str
985 source_data: str
986 list_unsubscribe_header: str
989def email_fields(mock, call_ix=0):
990 _, kw = mock.call_args_list[call_ix]
991 return EmailData(
992 sender_name=kw.get("sender_name"),
993 sender_email=kw.get("sender_email"),
994 recipient=kw.get("recipient"),
995 subject=kw.get("subject"),
996 plain=kw.get("plain"),
997 html=kw.get("html"),
998 source_data=kw.get("source_data"),
999 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
1000 )
1003@pytest.fixture
1004def push_collector():
1005 """
1006 See test_SendTestPushNotification for an example on how to use this fixture
1007 """
1009 class Push:
1010 """
1011 This allows nice access to the push info via e.g. push.title instead of push["title"]
1012 """
1014 def __init__(self, kwargs):
1015 self.kwargs = kwargs
1017 def __getattr__(self, attr):
1018 try:
1019 return self.kwargs[attr]
1020 except KeyError:
1021 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
1023 def __repr__(self):
1024 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
1025 return f"Push({kwargs_disp})"
1027 class PushCollector:
1028 def __init__(self):
1029 # pairs of (user_id, push)
1030 self.pushes = []
1032 def by_user(self, user_id):
1033 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
1035 def push_to_user(self, session, user_id, **kwargs):
1036 self.pushes.append((user_id, Push(kwargs=kwargs)))
1038 def assert_user_has_count(self, user_id, count):
1039 assert len(self.by_user(user_id)) == count
1041 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1042 push = self.by_user(user_id)[ix]
1043 for kwarg in kwargs:
1044 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1045 assert push.kwargs[kwarg] == kwargs[kwarg], (
1046 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1047 )
1049 def assert_user_has_single_matching(self, user_id, **kwargs):
1050 self.assert_user_has_count(user_id, 1)
1051 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1053 collector = PushCollector()
1055 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1056 yield collector