Coverage for src/tests/test_fixtures.py: 98%
514 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-04-16 15:13 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date, timedelta
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import (
20 AuthValidatorInterceptor,
21 CookieInterceptor,
22 SessionInterceptor,
23 _try_get_and_update_user_details,
24)
25from couchers.jobs.worker import process_job
26from couchers.models import (
27 Base,
28 FriendRelationship,
29 FriendStatus,
30 HostingStatus,
31 Language,
32 LanguageAbility,
33 LanguageFluency,
34 MeetupStatus,
35 PassportSex,
36 Region,
37 RegionLived,
38 RegionVisited,
39 StrongVerificationAttempt,
40 StrongVerificationAttemptStatus,
41 Upload,
42 User,
43 UserBlock,
44 UserSession,
45)
46from couchers.servicers.account import Account, Iris
47from couchers.servicers.admin import Admin
48from couchers.servicers.api import API
49from couchers.servicers.auth import Auth, create_session
50from couchers.servicers.blocking import Blocking
51from couchers.servicers.bugs import Bugs
52from couchers.servicers.communities import Communities
53from couchers.servicers.conversations import Conversations
54from couchers.servicers.discussions import Discussions
55from couchers.servicers.donations import Donations, Stripe
56from couchers.servicers.events import Events
57from couchers.servicers.gis import GIS
58from couchers.servicers.groups import Groups
59from couchers.servicers.jail import Jail
60from couchers.servicers.media import Media, get_media_auth_interceptor
61from couchers.servicers.notifications import Notifications
62from couchers.servicers.pages import Pages
63from couchers.servicers.references import References
64from couchers.servicers.reporting import Reporting
65from couchers.servicers.requests import Requests
66from couchers.servicers.resources import Resources
67from couchers.servicers.search import Search
68from couchers.servicers.threads import Threads
69from couchers.sql import couchers_select as select
70from couchers.utils import create_coordinate, now
71from proto import (
72 account_pb2_grpc,
73 admin_pb2_grpc,
74 annotations_pb2,
75 api_pb2_grpc,
76 auth_pb2_grpc,
77 blocking_pb2_grpc,
78 bugs_pb2_grpc,
79 communities_pb2_grpc,
80 conversations_pb2_grpc,
81 discussions_pb2_grpc,
82 donations_pb2_grpc,
83 events_pb2_grpc,
84 gis_pb2_grpc,
85 groups_pb2_grpc,
86 iris_pb2_grpc,
87 jail_pb2_grpc,
88 media_pb2_grpc,
89 notifications_pb2_grpc,
90 pages_pb2_grpc,
91 references_pb2_grpc,
92 reporting_pb2_grpc,
93 requests_pb2_grpc,
94 resources_pb2_grpc,
95 search_pb2_grpc,
96 stripe_pb2_grpc,
97 threads_pb2_grpc,
98)
101def drop_all():
102 """drop everything currently in the database"""
103 with session_scope() as session:
104 # postgis is required for all the Geographic Information System (GIS) stuff
105 # pg_trgm is required for trigram based search
106 # btree_gist is required for gist-based exclusion constraints
107 session.execute(
108 text(
109 "DROP SCHEMA IF EXISTS public CASCADE;"
110 "DROP SCHEMA IF EXISTS logging CASCADE;"
111 "DROP EXTENSION IF EXISTS postgis CASCADE;"
112 "CREATE SCHEMA public;"
113 "CREATE SCHEMA logging;"
114 "CREATE EXTENSION postgis;"
115 "CREATE EXTENSION pg_trgm;"
116 "CREATE EXTENSION btree_gist;"
117 )
118 )
120 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
121 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
122 # and similar errors
123 _get_base_engine().dispose()
125 close_all_sessions()
128def create_schema_from_models():
129 """
130 Create everything from the current models, not incrementally
131 through migrations.
132 """
134 # create the slugify function
135 functions = Path(__file__).parent / "slugify.sql"
136 with open(functions) as f, session_scope() as session:
137 session.execute(text(f.read()))
139 Base.metadata.create_all(_get_base_engine())
142def populate_testing_resources(session):
143 """
144 Testing version of couchers.resources.copy_resources_to_database
145 """
146 regions = [
147 ("AUS", "Australia"),
148 ("CAN", "Canada"),
149 ("CHE", "Switzerland"),
150 ("CUB", "Cuba"),
151 ("CXR", "Christmas Island"),
152 ("CZE", "Czechia"),
153 ("DEU", "Germany"),
154 ("EGY", "Egypt"),
155 ("ESP", "Spain"),
156 ("EST", "Estonia"),
157 ("FIN", "Finland"),
158 ("FRA", "France"),
159 ("GBR", "United Kingdom"),
160 ("GEO", "Georgia"),
161 ("GHA", "Ghana"),
162 ("GRC", "Greece"),
163 ("HKG", "Hong Kong"),
164 ("IRL", "Ireland"),
165 ("ISR", "Israel"),
166 ("ITA", "Italy"),
167 ("JPN", "Japan"),
168 ("LAO", "Laos"),
169 ("MEX", "Mexico"),
170 ("MMR", "Myanmar"),
171 ("NAM", "Namibia"),
172 ("NLD", "Netherlands"),
173 ("NZL", "New Zealand"),
174 ("POL", "Poland"),
175 ("PRK", "North Korea"),
176 ("REU", "Réunion"),
177 ("SGP", "Singapore"),
178 ("SWE", "Sweden"),
179 ("THA", "Thailand"),
180 ("TUR", "Turkey"),
181 ("TWN", "Taiwan"),
182 ("USA", "United States"),
183 ("VNM", "Vietnam"),
184 ]
186 languages = [
187 ("arb", "Arabic (Standard)"),
188 ("deu", "German"),
189 ("eng", "English"),
190 ("fin", "Finnish"),
191 ("fra", "French"),
192 ("heb", "Hebrew"),
193 ("hun", "Hungarian"),
194 ("jpn", "Japanese"),
195 ("pol", "Polish"),
196 ("swe", "Swedish"),
197 ("cmn", "Chinese (Mandarin)"),
198 ]
200 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
201 tz_sql = f.read()
203 for code, name in regions:
204 session.add(Region(code=code, name=name))
206 for code, name in languages:
207 session.add(Language(code=code, name=name))
209 session.execute(text(tz_sql))
212def recreate_database():
213 """
214 Connect to a running Postgres database, build it using metadata.create_all()
215 """
217 # running in non-UTC catches some timezone errors
218 os.environ["TZ"] = "America/New_York"
220 # drop everything currently in the database
221 drop_all()
223 # create everything from the current models, not incrementally through migrations
224 create_schema_from_models()
226 with session_scope() as session:
227 populate_testing_resources(session)
230@pytest.fixture()
231def db():
232 """
233 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
234 """
236 recreate_database()
239def generate_user(*, delete_user=False, complete_profile=True, strong_verification=False, **kwargs):
240 """
241 Create a new user, return session token
243 The user is detached from any session, and you can access its static attributes, but you can't modify it
245 Use this most of the time
246 """
247 auth = Auth()
249 with session_scope() as session:
250 # default args
251 username = "test_user_" + random_hex(16)
252 user_opts = {
253 "username": username,
254 "email": f"{username}@dev.couchers.org",
255 # password is just 'password'
256 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
257 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
258 "name": username.capitalize(),
259 "hosting_status": HostingStatus.cant_host,
260 "meetup_status": MeetupStatus.open_to_meetup,
261 "city": "Testing city",
262 "hometown": "Test hometown",
263 "community_standing": 0.5,
264 "birthdate": date(year=2000, month=1, day=1),
265 "gender": "Woman",
266 "pronouns": "",
267 "occupation": "Tester",
268 "education": "UST(esting)",
269 "about_me": "I test things",
270 "things_i_like": "Code",
271 "about_place": "My place has a lot of testing paraphenelia",
272 "additional_information": "I can be a bit testy",
273 # you need to make sure to update this logic to make sure the user is jailed/not on request
274 "accepted_tos": TOS_VERSION,
275 "accepted_community_guidelines": GUIDELINES_VERSION,
276 "geom": create_coordinate(40.7108, -73.9740),
277 "geom_radius": 100,
278 "onboarding_emails_sent": 1,
279 "last_onboarding_email_sent": now(),
280 "has_donated": True,
281 }
283 for key, value in kwargs.items():
284 user_opts[key] = value
286 user = User(**user_opts)
287 session.add(user)
288 session.flush()
290 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
291 session.add(RegionVisited(user_id=user.id, region_code="REU"))
292 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
294 session.add(RegionLived(user_id=user.id, region_code="ESP"))
295 session.add(RegionLived(user_id=user.id, region_code="FRA"))
296 session.add(RegionLived(user_id=user.id, region_code="EST"))
298 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
299 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
301 # this expires the user, so now it's "dirty"
302 session.commit()
304 class _DummyContext:
305 def invocation_metadata(self):
306 return {}
308 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
310 # deleted user aborts session creation, hence this follows and necessitates a second commit
311 if delete_user:
312 user.is_deleted = True
314 user.recommendation_score = 1e10 - user.id
316 if complete_profile:
317 key = random_hex(32)
318 filename = random_hex(32) + ".jpg"
319 session.add(
320 Upload(
321 key=key,
322 filename=filename,
323 creator_user_id=user.id,
324 )
325 )
326 session.flush()
327 user.avatar_key = key
328 user.about_me = "I have a complete profile!\n" * 20
330 if strong_verification:
331 attempt = StrongVerificationAttempt(
332 verification_attempt_token=f"verification_attempt_token_{user.id}",
333 user_id=user.id,
334 status=StrongVerificationAttemptStatus.succeeded,
335 has_full_data=True,
336 passport_encrypted_data=b"not real",
337 passport_date_of_birth=user.birthdate,
338 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
339 user.gender, PassportSex.unspecified
340 ),
341 has_minimal_data=True,
342 passport_expiry_date=date.today() + timedelta(days=10),
343 passport_nationality="UTO",
344 passport_last_three_document_chars=f"{user.id:03}",
345 iris_token=f"iris_token_{user.id}",
346 iris_session_id=user.id,
347 )
348 session.add(attempt)
349 session.flush()
350 assert attempt.has_strong_verification(user)
352 session.commit()
354 assert user.has_completed_profile == complete_profile
356 # refresh it, undoes the expiry
357 session.refresh(user)
359 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
360 user.timezone # noqa: B018
362 # allows detaches the user from the session, allowing its use outside this session
363 session.expunge(user)
365 return user, token
368def get_user_id_and_token(session, username):
369 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
370 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
371 return user_id, token
374def make_friends(user1, user2):
375 with session_scope() as session:
376 friend_relationship = FriendRelationship(
377 from_user_id=user1.id,
378 to_user_id=user2.id,
379 status=FriendStatus.accepted,
380 )
381 session.add(friend_relationship)
384def make_user_block(user1, user2):
385 with session_scope() as session:
386 user_block = UserBlock(
387 blocking_user_id=user1.id,
388 blocked_user_id=user2.id,
389 )
390 session.add(user_block)
391 session.commit()
394def make_user_invisible(user_id):
395 with session_scope() as session:
396 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
399# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
400def get_friend_relationship(user1, user2):
401 with session_scope() as session:
402 friend_relationship = session.execute(
403 select(FriendRelationship).where(
404 or_(
405 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
406 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
407 )
408 )
409 ).scalar_one_or_none()
411 session.expunge(friend_relationship)
412 return friend_relationship
415class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
416 """
417 Injects the right `cookie: couchers-sesh=...` header into the metadata
418 """
420 def __init__(self, token):
421 self.token = token
423 def __call__(self, context, callback):
424 callback((("cookie", f"couchers-sesh={self.token}"),), None)
427@contextmanager
428def auth_api_session(grpc_channel_options=()):
429 """
430 Create an Auth API for testing
432 This needs to use the real server since it plays around with headers
433 """
434 with futures.ThreadPoolExecutor(1) as executor:
435 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
436 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
437 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
438 server.start()
440 try:
441 with grpc.secure_channel(
442 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
443 ) as channel:
445 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
446 def __init__(self):
447 self.latest_headers = {}
449 def intercept_unary_unary(self, continuation, client_call_details, request):
450 call = continuation(client_call_details, request)
451 self.latest_headers = dict(call.initial_metadata())
452 self.latest_header_raw = call.initial_metadata()
453 return call
455 metadata_interceptor = _MetadataKeeperInterceptor()
456 channel = grpc.intercept_channel(channel, metadata_interceptor)
457 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
458 finally:
459 server.stop(None).wait()
462@contextmanager
463def api_session(token):
464 """
465 Create an API for testing, uses the token for auth
466 """
467 channel = fake_channel(token)
468 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
469 yield api_pb2_grpc.APIStub(channel)
472@contextmanager
473def real_api_session(token):
474 """
475 Create an API for testing, using TCP sockets, uses the token for auth
476 """
477 with futures.ThreadPoolExecutor(1) as executor:
478 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
479 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
480 api_pb2_grpc.add_APIServicer_to_server(API(), server)
481 server.start()
483 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
484 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
486 try:
487 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
488 yield api_pb2_grpc.APIStub(channel)
489 finally:
490 server.stop(None).wait()
493@contextmanager
494def real_admin_session(token):
495 """
496 Create a Admin service for testing, using TCP sockets, uses the token for auth
497 """
498 with futures.ThreadPoolExecutor(1) as executor:
499 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
500 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
501 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
502 server.start()
504 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
505 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
507 try:
508 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
509 yield admin_pb2_grpc.AdminStub(channel)
510 finally:
511 server.stop(None).wait()
514@contextmanager
515def real_account_session(token):
516 """
517 Create a Account service for testing, using TCP sockets, uses the token for auth
518 """
519 with futures.ThreadPoolExecutor(1) as executor:
520 server = grpc.server(
521 executor, interceptors=[AuthValidatorInterceptor(), CookieInterceptor(), SessionInterceptor()]
522 )
523 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
524 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
525 server.start()
527 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
528 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
530 try:
531 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
532 yield account_pb2_grpc.AccountStub(channel)
533 finally:
534 server.stop(None).wait()
537@contextmanager
538def real_jail_session(token):
539 """
540 Create a Jail service for testing, using TCP sockets, uses the token for auth
541 """
542 with futures.ThreadPoolExecutor(1) as executor:
543 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
544 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
545 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
546 server.start()
548 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
549 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
551 try:
552 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
553 yield jail_pb2_grpc.JailStub(channel)
554 finally:
555 server.stop(None).wait()
558@contextmanager
559def gis_session(token):
560 channel = fake_channel(token)
561 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
562 yield gis_pb2_grpc.GISStub(channel)
565class FakeRpcError(grpc.RpcError):
566 def __init__(self, code, details):
567 self._code = code
568 self._details = details
570 def code(self):
571 return self._code
573 def details(self):
574 return self._details
577def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
578 # method is of the form "/org.couchers.api.core.API/GetUser"
579 _, service_name, method_name = method.split("/")
581 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
582 auth_level = service_options.Extensions[annotations_pb2.auth_level]
583 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
584 assert auth_level in [
585 annotations_pb2.AUTH_LEVEL_OPEN,
586 annotations_pb2.AUTH_LEVEL_JAILED,
587 annotations_pb2.AUTH_LEVEL_SECURE,
588 annotations_pb2.AUTH_LEVEL_ADMIN,
589 ]
591 if not user_id:
592 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
593 else:
594 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
595 "Non-superuser tried to call superuser API"
596 )
597 assert not (
598 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
599 ), "User is jailed but tried to call non-open/non-jailed API"
602class FakeChannel:
603 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
604 self.handlers = {}
605 self.user_id = user_id
606 self._is_jailed = is_jailed
607 self._is_superuser = is_superuser
608 self._token_expiry = token_expiry
610 def abort(self, code, details):
611 raise FakeRpcError(code, details)
613 def add_generic_rpc_handlers(self, generic_rpc_handlers):
614 from grpc._server import _validate_generic_rpc_handlers
616 _validate_generic_rpc_handlers(generic_rpc_handlers)
618 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
620 def unary_unary(self, uri, request_serializer, response_deserializer):
621 handler = self.handlers[uri]
623 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
625 def fake_handler(request):
626 # Do a full serialization cycle on the request and the
627 # response to catch accidental use of unserializable data.
628 request = handler.request_deserializer(request_serializer(request))
630 with session_scope() as session:
631 response = handler.unary_unary(request, self, session)
633 return response_deserializer(handler.response_serializer(response))
635 return fake_handler
638def fake_channel(token=None):
639 if token:
640 user_id, is_jailed, is_superuser, token_expiry, ui_language_preference = _try_get_and_update_user_details(
641 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
642 )
643 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
644 return FakeChannel()
647@contextmanager
648def conversations_session(token):
649 """
650 Create a Conversations API for testing, uses the token for auth
651 """
652 channel = fake_channel(token)
653 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
654 yield conversations_pb2_grpc.ConversationsStub(channel)
657@contextmanager
658def requests_session(token):
659 """
660 Create a Requests API for testing, uses the token for auth
661 """
662 channel = fake_channel(token)
663 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
664 yield requests_pb2_grpc.RequestsStub(channel)
667@contextmanager
668def threads_session(token):
669 channel = fake_channel(token)
670 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
671 yield threads_pb2_grpc.ThreadsStub(channel)
674@contextmanager
675def discussions_session(token):
676 channel = fake_channel(token)
677 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
678 yield discussions_pb2_grpc.DiscussionsStub(channel)
681@contextmanager
682def donations_session(token):
683 channel = fake_channel(token)
684 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
685 yield donations_pb2_grpc.DonationsStub(channel)
688@contextmanager
689def real_stripe_session():
690 """
691 Create a Stripe service for testing, using TCP sockets
692 """
693 with futures.ThreadPoolExecutor(1) as executor:
694 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
695 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
696 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
697 server.start()
699 creds = grpc.local_channel_credentials()
701 try:
702 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
703 yield stripe_pb2_grpc.StripeStub(channel)
704 finally:
705 server.stop(None).wait()
708@contextmanager
709def real_iris_session():
710 with futures.ThreadPoolExecutor(1) as executor:
711 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
712 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
713 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
714 server.start()
716 creds = grpc.local_channel_credentials()
718 try:
719 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
720 yield iris_pb2_grpc.IrisStub(channel)
721 finally:
722 server.stop(None).wait()
725@contextmanager
726def pages_session(token):
727 channel = fake_channel(token)
728 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
729 yield pages_pb2_grpc.PagesStub(channel)
732@contextmanager
733def communities_session(token):
734 channel = fake_channel(token)
735 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
736 yield communities_pb2_grpc.CommunitiesStub(channel)
739@contextmanager
740def groups_session(token):
741 channel = fake_channel(token)
742 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
743 yield groups_pb2_grpc.GroupsStub(channel)
746@contextmanager
747def blocking_session(token):
748 channel = fake_channel(token)
749 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
750 yield blocking_pb2_grpc.BlockingStub(channel)
753@contextmanager
754def notifications_session(token):
755 channel = fake_channel(token)
756 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
757 yield notifications_pb2_grpc.NotificationsStub(channel)
760@contextmanager
761def account_session(token):
762 """
763 Create a Account API for testing, uses the token for auth
764 """
765 channel = fake_channel(token)
766 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
767 yield account_pb2_grpc.AccountStub(channel)
770@contextmanager
771def search_session(token):
772 """
773 Create a Search API for testing, uses the token for auth
774 """
775 channel = fake_channel(token)
776 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
777 yield search_pb2_grpc.SearchStub(channel)
780@contextmanager
781def references_session(token):
782 """
783 Create a References API for testing, uses the token for auth
784 """
785 channel = fake_channel(token)
786 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
787 yield references_pb2_grpc.ReferencesStub(channel)
790@contextmanager
791def reporting_session(token):
792 channel = fake_channel(token)
793 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
794 yield reporting_pb2_grpc.ReportingStub(channel)
797@contextmanager
798def events_session(token):
799 channel = fake_channel(token)
800 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
801 yield events_pb2_grpc.EventsStub(channel)
804@contextmanager
805def bugs_session(token=None):
806 channel = fake_channel(token)
807 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
808 yield bugs_pb2_grpc.BugsStub(channel)
811@contextmanager
812def resources_session():
813 channel = fake_channel()
814 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
815 yield resources_pb2_grpc.ResourcesStub(channel)
818@contextmanager
819def media_session(bearer_token):
820 """
821 Create a fresh Media API for testing, uses the bearer token for media auth
822 """
823 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
825 with futures.ThreadPoolExecutor(1) as executor:
826 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
827 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
828 servicer = Media()
829 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
830 server.start()
832 call_creds = grpc.access_token_call_credentials(bearer_token)
833 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
835 try:
836 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
837 yield media_pb2_grpc.MediaStub(channel)
838 finally:
839 server.stop(None).wait()
842@pytest.fixture(scope="class")
843def testconfig():
844 prevconfig = config.copy()
845 config.clear()
846 config.update(prevconfig)
848 config["IN_TEST"] = True
850 config["DEV"] = True
851 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
852 config["VERSION"] = "testing_version"
853 config["BASE_URL"] = "http://localhost:3000"
854 config["BACKEND_BASE_URL"] = "http://localhost:8888"
855 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
856 config["COOKIE_DOMAIN"] = "localhost"
858 config["ENABLE_SMS"] = False
859 config["SMS_SENDER_ID"] = "invalid"
861 config["ENABLE_EMAIL"] = False
862 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
863 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
864 config["NOTIFICATION_PREFIX"] = "[TEST] "
865 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
866 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
867 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
869 config["ENABLE_DONATIONS"] = False
870 config["STRIPE_API_KEY"] = ""
871 config["STRIPE_WEBHOOK_SECRET"] = ""
872 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
874 config["ENABLE_STRONG_VERIFICATION"] = False
875 config["IRIS_ID_PUBKEY"] = ""
876 config["IRIS_ID_SECRET"] = ""
877 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
878 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
879 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
880 )
882 config["SMTP_HOST"] = "localhost"
883 config["SMTP_PORT"] = 587
884 config["SMTP_USERNAME"] = "username"
885 config["SMTP_PASSWORD"] = "password"
887 config["ENABLE_MEDIA"] = True
888 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
889 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
890 )
891 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
892 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
893 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
895 config["BUG_TOOL_ENABLED"] = False
896 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
897 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
898 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
900 config["LISTMONK_ENABLED"] = False
901 config["LISTMONK_BASE_URL"] = "https://localhost"
902 config["LISTMONK_API_USERNAME"] = "..."
903 config["LISTMONK_API_KEY"] = "..."
904 config["LISTMONK_LIST_ID"] = 3
906 config["PUSH_NOTIFICATIONS_ENABLED"] = True
907 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
908 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
910 config["ACTIVENESS_PROBES_ENABLED"] = True
912 yield None
914 config.clear()
915 config.update(prevconfig)
918@pytest.fixture
919def fast_passwords():
920 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
921 # make this fast by removing the hashing step
923 def fast_hash(password: bytes) -> bytes:
924 return b"fake hash:" + password
926 def fast_verify(hashed: bytes, password: bytes) -> bool:
927 return hashed == fast_hash(password)
929 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
930 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
931 yield
934def process_jobs():
935 while process_job():
936 pass
939@contextmanager
940def mock_notification_email():
941 with patch("couchers.email._queue_email") as mock:
942 yield mock
943 process_jobs()
946@dataclass
947class EmailData:
948 sender_name: str
949 sender_email: str
950 recipient: str
951 subject: str
952 plain: str
953 html: str
954 source_data: str
955 list_unsubscribe_header: str
958def email_fields(mock, call_ix=0):
959 _, kw = mock.call_args_list[call_ix]
960 return EmailData(
961 sender_name=kw.get("sender_name"),
962 sender_email=kw.get("sender_email"),
963 recipient=kw.get("recipient"),
964 subject=kw.get("subject"),
965 plain=kw.get("plain"),
966 html=kw.get("html"),
967 source_data=kw.get("source_data"),
968 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
969 )
972@pytest.fixture
973def push_collector():
974 """
975 See test_SendTestPushNotification for an example on how to use this fixture
976 """
978 class Push:
979 """
980 This allows nice access to the push info via e.g. push.title instead of push["title"]
981 """
983 def __init__(self, kwargs):
984 self.kwargs = kwargs
986 def __getattr__(self, attr):
987 try:
988 return self.kwargs[attr]
989 except KeyError:
990 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
992 def __repr__(self):
993 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
994 return f"Push({kwargs_disp})"
996 class PushCollector:
997 def __init__(self):
998 # pairs of (user_id, push)
999 self.pushes = []
1001 def by_user(self, user_id):
1002 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
1004 def push_to_user(self, session, user_id, **kwargs):
1005 self.pushes.append((user_id, Push(kwargs=kwargs)))
1007 def assert_user_has_count(self, user_id, count):
1008 assert len(self.by_user(user_id)) == count
1010 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1011 push = self.by_user(user_id)[ix]
1012 for kwarg in kwargs:
1013 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1014 assert push.kwargs[kwarg] == kwargs[kwarg], (
1015 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1016 )
1018 def assert_user_has_single_matching(self, user_id, **kwargs):
1019 self.assert_user_has_count(user_id, 1)
1020 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1022 collector = PushCollector()
1024 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1025 yield collector