Coverage for src/tests/test_fixtures.py: 98%
636 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import os
2import re
3from collections.abc import Generator, Sequence
4from concurrent import futures
5from contextlib import contextmanager
6from dataclasses import dataclass
7from datetime import date, timedelta
8from pathlib import Path
9from typing import Any
10from unittest.mock import patch
12import grpc
13import pytest
14from grpc._server import _validate_generic_rpc_handlers
15from sqlalchemy import Connection, Engine, create_engine, update
16from sqlalchemy.orm import Session
17from sqlalchemy.sql import or_, text
19from couchers.config import config
20from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
21from couchers.context import make_interactive_context
22from couchers.crypto import random_hex
23from couchers.db import _get_base_engine, session_scope
24from couchers.descriptor_pool import get_descriptor_pool
25from couchers.interceptors import (
26 CouchersMiddlewareInterceptor,
27 UserAuthInfo,
28 _try_get_and_update_user_details,
29)
30from couchers.jobs.worker import process_job
31from couchers.models import (
32 Base,
33 FriendRelationship,
34 FriendStatus,
35 HostingStatus,
36 LanguageAbility,
37 LanguageFluency,
38 MeetupStatus,
39 ModerationUserList,
40 PassportSex,
41 PhotoGallery,
42 RegionLived,
43 RegionVisited,
44 StrongVerificationAttempt,
45 StrongVerificationAttemptStatus,
46 Upload,
47 User,
48 UserBlock,
49 UserSession,
50)
51from couchers.proto import (
52 account_pb2_grpc,
53 admin_pb2_grpc,
54 annotations_pb2,
55 api_pb2_grpc,
56 auth_pb2_grpc,
57 blocking_pb2_grpc,
58 bugs_pb2_grpc,
59 communities_pb2_grpc,
60 conversations_pb2_grpc,
61 discussions_pb2_grpc,
62 donations_pb2_grpc,
63 editor_pb2_grpc,
64 events_pb2_grpc,
65 galleries_pb2_grpc,
66 gis_pb2_grpc,
67 groups_pb2_grpc,
68 iris_pb2_grpc,
69 jail_pb2_grpc,
70 media_pb2_grpc,
71 moderation_pb2,
72 moderation_pb2_grpc,
73 notifications_pb2_grpc,
74 pages_pb2_grpc,
75 postal_verification_pb2_grpc,
76 public_pb2_grpc,
77 references_pb2_grpc,
78 reporting_pb2_grpc,
79 requests_pb2_grpc,
80 resources_pb2_grpc,
81 search_pb2_grpc,
82 stripe_pb2_grpc,
83 threads_pb2_grpc,
84)
85from couchers.servicers.account import Account, Iris
86from couchers.servicers.admin import Admin
87from couchers.servicers.api import API
88from couchers.servicers.auth import Auth, create_session
89from couchers.servicers.blocking import Blocking
90from couchers.servicers.bugs import Bugs
91from couchers.servicers.communities import Communities
92from couchers.servicers.conversations import Conversations
93from couchers.servicers.discussions import Discussions
94from couchers.servicers.donations import Donations, Stripe
95from couchers.servicers.editor import Editor
96from couchers.servicers.events import Events
97from couchers.servicers.galleries import Galleries
98from couchers.servicers.gis import GIS
99from couchers.servicers.groups import Groups
100from couchers.servicers.jail import Jail
101from couchers.servicers.media import Media, get_media_auth_interceptor
102from couchers.servicers.moderation import Moderation
103from couchers.servicers.notifications import Notifications
104from couchers.servicers.pages import Pages
105from couchers.servicers.postal_verification import PostalVerification
106from couchers.servicers.public import Public
107from couchers.servicers.references import References
108from couchers.servicers.reporting import Reporting
109from couchers.servicers.requests import Requests
110from couchers.servicers.resources import Resources
111from couchers.servicers.search import Search
112from couchers.servicers.threads import Threads
113from couchers.sql import couchers_select as select
114from couchers.utils import create_coordinate, now
117def create_schema_from_models(engine: Engine | None = None) -> None:
118 """
119 Create everything from the current models, not incrementally
120 through migrations.
121 """
122 if engine is None:
123 engine = _get_base_engine()
125 # create sql functions (these are created in migrations otherwise)
126 functions = Path(__file__).parent / "sql_functions.sql"
127 with open(functions) as f, engine.connect() as conn:
128 conn.execute(text(f.read()))
129 conn.commit()
131 Base.metadata.create_all(engine)
134def populate_testing_resources(conn: Connection) -> None:
135 """
136 Testing version of couchers.resources.copy_resources_to_database
137 """
138 conn.execute(
139 text("""
140 INSERT INTO regions (code, name) VALUES
141 ('AUS', 'Australia'),
142 ('CAN', 'Canada'),
143 ('CHE', 'Switzerland'),
144 ('CUB', 'Cuba'),
145 ('CXR', 'Christmas Island'),
146 ('CZE', 'Czechia'),
147 ('DEU', 'Germany'),
148 ('EGY', 'Egypt'),
149 ('ESP', 'Spain'),
150 ('EST', 'Estonia'),
151 ('FIN', 'Finland'),
152 ('FRA', 'France'),
153 ('GBR', 'United Kingdom'),
154 ('GEO', 'Georgia'),
155 ('GHA', 'Ghana'),
156 ('GRC', 'Greece'),
157 ('HKG', 'Hong Kong'),
158 ('IRL', 'Ireland'),
159 ('ISR', 'Israel'),
160 ('ITA', 'Italy'),
161 ('JPN', 'Japan'),
162 ('LAO', 'Laos'),
163 ('MEX', 'Mexico'),
164 ('MMR', 'Myanmar'),
165 ('NAM', 'Namibia'),
166 ('NLD', 'Netherlands'),
167 ('NZL', 'New Zealand'),
168 ('POL', 'Poland'),
169 ('PRK', 'North Korea'),
170 ('REU', 'Réunion'),
171 ('SGP', 'Singapore'),
172 ('SWE', 'Sweden'),
173 ('THA', 'Thailand'),
174 ('TUR', 'Turkey'),
175 ('TWN', 'Taiwan'),
176 ('USA', 'United States'),
177 ('VNM', 'Vietnam');
178 """)
179 )
181 # Insert languages as textual SQL
182 conn.execute(
183 text("""
184 INSERT INTO languages (code, name) VALUES
185 ('arb', 'Arabic (Standard)'),
186 ('deu', 'German'),
187 ('eng', 'English'),
188 ('fin', 'Finnish'),
189 ('fra', 'French'),
190 ('heb', 'Hebrew'),
191 ('hun', 'Hungarian'),
192 ('jpn', 'Japanese'),
193 ('pol', 'Polish'),
194 ('swe', 'Swedish'),
195 ('cmn', 'Chinese (Mandarin)')
196 """)
197 )
199 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
200 tz_sql = f.read()
202 conn.execute(text(tz_sql))
205def drop_database() -> None:
206 with session_scope() as session:
207 # postgis is required for all the Geographic Information System (GIS) stuff
208 # pg_trgm is required for trigram-based search
209 # btree_gist is required for gist-based exclusion constraints
210 session.execute(
211 text(
212 "DROP SCHEMA IF EXISTS public CASCADE;"
213 "DROP SCHEMA IF EXISTS logging CASCADE;"
214 "DROP EXTENSION IF EXISTS postgis CASCADE;"
215 "CREATE SCHEMA IF NOT EXISTS public;"
216 "CREATE SCHEMA IF NOT EXISTS logging;"
217 "CREATE EXTENSION postgis;"
218 "CREATE EXTENSION pg_trgm;"
219 "CREATE EXTENSION btree_gist;"
220 )
221 )
224@contextmanager
225def autocommit_engine(url: str):
226 """
227 An engine that executes every statement in a transaction. Mainly needed
228 because CREATE/DROP DATABASE cannot be executed any other way.
229 """
230 engine = create_engine(
231 url,
232 isolation_level="AUTOCOMMIT",
233 )
234 yield engine
235 engine.dispose()
238@pytest.fixture(scope="session")
239def postgres_engine() -> Generator[Engine]:
240 """
241 SQLAlchemy engine connected to "postgres" database.
242 """
243 dsn = config["DATABASE_CONNECTION_STRING"]
244 if not dsn.endswith("/testdb"):
245 raise RuntimeError(f"DATABASE_CONNECTION_STRING must point to /testdb, but was {dsn}")
247 postgres_dsn = re.sub(r"/testdb$", "/postgres", dsn)
249 with autocommit_engine(postgres_dsn) as engine:
250 yield engine
253@pytest.fixture(scope="session")
254def postgres_conn(postgres_engine: Engine) -> Generator[Connection]:
255 """
256 Acquiring a connection takes time, so we cache it.
257 """
258 with postgres_engine.connect() as conn:
259 yield conn
262@pytest.fixture(scope="session")
263def template_db(postgres_conn: Connection) -> str:
264 """
265 Creates a template database with all the extensions, tables,
266 and static data (languages, regions.) This is done only once: then
267 we copy this template for every test. It's much faster than creating
268 a database without a template or deleting data from all tables between
269 tests. The tables are created from SQLA metadata, not by running the
270 migrations - again, for speed.
271 """
272 # running in non-UTC catches some timezone errors
273 os.environ["TZ"] = "America/New_York"
275 name = "couchers_template"
277 postgres_conn.execute(text(f"DROP DATABASE IF EXISTS {name}"))
278 postgres_conn.execute(text(f"CREATE DATABASE {name}"))
280 template_dsn = re.sub(
281 r"/testdb$",
282 f"/{name}",
283 config["DATABASE_CONNECTION_STRING"],
284 )
286 with autocommit_engine(template_dsn) as engine:
287 with engine.connect() as conn:
288 conn.execute(
289 text(
290 "CREATE SCHEMA logging;"
291 "CREATE EXTENSION IF NOT EXISTS postgis;"
292 "CREATE EXTENSION IF NOT EXISTS pg_trgm;"
293 "CREATE EXTENSION IF NOT EXISTS btree_gist;"
294 )
295 )
297 create_schema_from_models(engine)
298 populate_testing_resources(conn)
300 return name
303@pytest.fixture
304def db(template_db: str, postgres_conn: Connection) -> None:
305 """
306 Creates a fresh database for a test by copying a template. The template has
307 the migrations applied and is populated with static data (regions, languages, etc.)
308 """
309 postgres_conn.execute(text("DROP DATABASE IF EXISTS testdb WITH (FORCE)"))
310 postgres_conn.execute(text(f"CREATE DATABASE testdb WITH TEMPLATE {template_db}"))
313@pytest.fixture(scope="class")
314def db_class(template_db: str, postgres_conn: Connection) -> None:
315 """
316 The same as above, but with a different scope. Used in test_communities.py.
317 """
318 postgres_conn.execute(text("DROP DATABASE IF EXISTS testdb WITH (FORCE)"))
319 postgres_conn.execute(text(f"CREATE DATABASE testdb WITH TEMPLATE {template_db}"))
322class _MockCouchersContext:
323 @property
324 def headers(self):
325 return {}
328def generate_user(
329 *,
330 delete_user=False,
331 complete_profile=True,
332 strong_verification=False,
333 regions_visited: Sequence[str] = (),
334 regions_lived: Sequence[str] = (),
335 language_abilities: Sequence[tuple[str, LanguageFluency]] = (),
336 **kwargs: Any,
337) -> tuple[User, str]:
338 """
339 Create a new user, return session token
341 The user is detached from any session, and you can access its static attributes, but you can't modify it
343 Use this most of the time
344 """
345 with session_scope() as session:
346 # Ensure superusers are also editors (DB constraint)
347 if kwargs.get("is_superuser") and "is_editor" not in kwargs:
348 kwargs["is_editor"] = True
350 # default args
351 username = "test_user_" + random_hex(16)
352 user_opts = {
353 "username": username,
354 "email": f"{username}@dev.couchers.org",
355 # password is just 'password'
356 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
357 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
358 "name": username.capitalize(),
359 "hosting_status": HostingStatus.cant_host,
360 "meetup_status": MeetupStatus.open_to_meetup,
361 "city": "Testing city",
362 "hometown": "Test hometown",
363 "community_standing": 0.5,
364 "birthdate": date(year=2000, month=1, day=1),
365 "gender": "Woman",
366 "pronouns": "",
367 "occupation": "Tester",
368 "education": "UST(esting)",
369 "about_me": "I test things",
370 "things_i_like": "Code",
371 "about_place": "My place has a lot of testing paraphenelia",
372 "additional_information": "I can be a bit testy",
373 # you need to make sure to update this logic to make sure the user is jailed/not on request
374 "accepted_tos": TOS_VERSION,
375 "accepted_community_guidelines": GUIDELINES_VERSION,
376 "geom": create_coordinate(40.7108, -73.9740),
377 "geom_radius": 100,
378 "onboarding_emails_sent": 1,
379 "last_onboarding_email_sent": now(),
380 "last_donated": now(),
381 } | kwargs
383 user = User(**user_opts)
384 session.add(user)
385 session.flush()
387 # Create a profile gallery for the user and link it
388 profile_gallery = PhotoGallery(owner_user_id=user.id)
389 session.add(profile_gallery)
390 session.flush()
391 user.profile_gallery_id = profile_gallery.id
393 for region in regions_visited:
394 session.add(RegionVisited(user_id=user.id, region_code=region))
396 for region in regions_lived:
397 session.add(RegionLived(user_id=user.id, region_code=region))
399 for lang, fluency in language_abilities:
400 session.add(LanguageAbility(user_id=user.id, language_code=lang, fluency=fluency))
402 # this expires the user, so now it's "dirty"
403 token, _ = create_session(_MockCouchersContext(), session, user, False, set_cookie=False)
405 # deleted user aborts session creation, hence this follows and necessitates a second commit
406 if delete_user:
407 user.is_deleted = True
409 user.recommendation_score = 1e10 - user.id
411 if complete_profile:
412 key = random_hex(32)
413 filename = random_hex(32) + ".jpg"
414 session.add(
415 Upload(
416 key=key,
417 filename=filename,
418 creator_user_id=user.id,
419 )
420 )
421 session.flush()
422 user.avatar_key = key
423 user.about_me = "I have a complete profile!\n" * 20
425 if strong_verification:
426 attempt = StrongVerificationAttempt(
427 verification_attempt_token=f"verification_attempt_token_{user.id}",
428 user_id=user.id,
429 status=StrongVerificationAttemptStatus.succeeded,
430 has_full_data=True,
431 passport_encrypted_data=b"not real",
432 passport_date_of_birth=user.birthdate,
433 passport_sex={"Woman": PassportSex.female, "Man": PassportSex.male}.get(
434 user.gender, PassportSex.unspecified
435 ),
436 has_minimal_data=True,
437 passport_expiry_date=date.today() + timedelta(days=10),
438 passport_nationality="UTO",
439 passport_last_three_document_chars=f"{user.id:03}",
440 iris_token=f"iris_token_{user.id}",
441 iris_session_id=user.id,
442 )
443 session.add(attempt)
444 session.flush()
445 assert attempt.has_strong_verification(user)
447 session.commit()
449 assert user.has_completed_profile == complete_profile
451 # refresh it, undoes the expiry
452 session.refresh(user)
454 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
455 user.timezone # noqa: B018
457 # allows detaches the user from the session, allowing its use outside this session
458 session.expunge(user)
460 return user, token
463def get_user_id_and_token(session: Session, username: str) -> tuple[int, str]:
464 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
465 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
466 return user_id, token
469def make_friends(user1: User, user2: User) -> None:
470 with session_scope() as session:
471 friend_relationship = FriendRelationship(
472 from_user_id=user1.id,
473 to_user_id=user2.id,
474 status=FriendStatus.accepted,
475 )
476 session.add(friend_relationship)
479def make_user_block(user1: User, user2: User) -> None:
480 with session_scope() as session:
481 user_block = UserBlock(
482 blocking_user_id=user1.id,
483 blocked_user_id=user2.id,
484 )
485 session.add(user_block)
488def make_user_invisible(user_id: int) -> None:
489 with session_scope() as session:
490 session.execute(update(User).where(User.id == user_id).values(is_banned=True))
493# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
494def get_friend_relationship(user1: User, user2: User) -> FriendRelationship:
495 with session_scope() as session:
496 friend_relationship = session.execute(
497 select(FriendRelationship).where(
498 or_(
499 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
500 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
501 )
502 )
503 ).scalar_one_or_none()
505 session.expunge(friend_relationship)
506 return friend_relationship
509def add_users_to_new_moderation_list(users: list[User]) -> int:
510 """Group users as duplicated accounts"""
511 with session_scope() as session:
512 moderation_user_list = ModerationUserList()
513 session.add(moderation_user_list)
514 session.flush()
515 for user in users:
516 refreshed_user = session.get(User, user.id)
517 moderation_user_list.users.append(refreshed_user)
518 return moderation_user_list.id
521class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
522 """
523 Injects the right `cookie: couchers-sesh=...` header into the metadata
524 """
526 def __init__(self, token: str):
527 self.token = token
529 def __call__(self, context, callback) -> None:
530 callback((("cookie", f"couchers-sesh={self.token}"),), None)
533class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
534 def __init__(self):
535 self.latest_headers = {}
537 def intercept_unary_unary(self, continuation, client_call_details, request):
538 call = continuation(client_call_details, request)
539 self.latest_headers = dict(call.initial_metadata())
540 self.latest_header_raw = call.initial_metadata()
541 return call
544@contextmanager
545def auth_api_session(
546 grpc_channel_options=(),
547) -> Generator[tuple[auth_pb2_grpc.AuthStub, grpc.UnaryUnaryClientInterceptor]]:
548 """
549 Create an Auth API for testing
551 This needs to use the real server since it plays around with headers
552 """
553 with futures.ThreadPoolExecutor(1) as executor:
554 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
555 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
556 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
557 server.start()
559 try:
560 with grpc.secure_channel(
561 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
562 ) as channel:
563 metadata_interceptor = _MetadataKeeperInterceptor()
564 channel = grpc.intercept_channel(channel, metadata_interceptor)
565 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
566 finally:
567 server.stop(None).wait()
570@contextmanager
571def api_session(token):
572 """
573 Create an API for testing, uses the token for auth
574 """
575 channel = FakeChannel(token)
576 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
577 yield api_pb2_grpc.APIStub(channel)
580@contextmanager
581def real_api_session(token):
582 """
583 Create an API for testing, using TCP sockets, uses the token for auth
584 """
585 with futures.ThreadPoolExecutor(1) as executor:
586 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
587 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
588 api_pb2_grpc.add_APIServicer_to_server(API(), server)
589 server.start()
591 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
592 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
594 try:
595 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
596 yield api_pb2_grpc.APIStub(channel)
597 finally:
598 server.stop(None).wait()
601@contextmanager
602def real_admin_session(token):
603 """
604 Create a Admin service for testing, using TCP sockets, uses the token for auth
605 """
606 with futures.ThreadPoolExecutor(1) as executor:
607 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
608 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
609 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
610 server.start()
612 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
613 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
615 try:
616 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
617 yield admin_pb2_grpc.AdminStub(channel)
618 finally:
619 server.stop(None).wait()
622@contextmanager
623def real_editor_session(token):
624 """
625 Create an Editor service for testing, using TCP sockets, uses the token for auth
626 """
627 with futures.ThreadPoolExecutor(1) as executor:
628 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
629 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
630 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
631 server.start()
633 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
634 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
636 try:
637 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
638 yield editor_pb2_grpc.EditorStub(channel)
639 finally:
640 server.stop(None).wait()
643@contextmanager
644def real_moderation_session(token):
645 """
646 Create a Moderation service for testing, using TCP sockets, uses the token for auth
647 """
648 with futures.ThreadPoolExecutor(1) as executor:
649 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
650 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
651 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
652 server.start()
654 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
655 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
657 try:
658 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
659 yield moderation_pb2_grpc.ModerationStub(channel)
660 finally:
661 server.stop(None).wait()
664@contextmanager
665def real_account_session(token: str):
666 """
667 Create a Account service for testing, using TCP sockets, uses the token for auth
668 """
669 with futures.ThreadPoolExecutor(1) as executor:
670 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
671 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
672 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
673 server.start()
675 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
676 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
678 try:
679 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
680 yield account_pb2_grpc.AccountStub(channel)
681 finally:
682 server.stop(None).wait()
685@contextmanager
686def real_jail_session(token: str):
687 """
688 Create a Jail service for testing, using TCP sockets, uses the token for auth
689 """
690 with futures.ThreadPoolExecutor(1) as executor:
691 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
692 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
693 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
694 server.start()
696 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
697 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
699 try:
700 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
701 yield jail_pb2_grpc.JailStub(channel)
702 finally:
703 server.stop(None).wait()
706@contextmanager
707def gis_session(token):
708 channel = FakeChannel(token)
709 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
710 yield gis_pb2_grpc.GISStub(channel)
713@contextmanager
714def public_session():
715 channel = FakeChannel()
716 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
717 yield public_pb2_grpc.PublicStub(channel)
720class FakeRpcError(grpc.RpcError):
721 def __init__(self, code, details):
722 self._code = code
723 self._details = details
725 def code(self):
726 return self._code
728 def details(self):
729 return self._details
732def _check_user_perms(method: str, user_id: int, is_jailed: bool, is_editor: bool, is_superuser: bool) -> None:
733 # method is of the form "/org.couchers.api.core.API/GetUser"
734 _, service_name, method_name = method.split("/")
736 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
737 auth_level = service_options.Extensions[annotations_pb2.auth_level]
738 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
739 assert auth_level in [
740 annotations_pb2.AUTH_LEVEL_OPEN,
741 annotations_pb2.AUTH_LEVEL_JAILED,
742 annotations_pb2.AUTH_LEVEL_SECURE,
743 annotations_pb2.AUTH_LEVEL_EDITOR,
744 annotations_pb2.AUTH_LEVEL_ADMIN,
745 ]
747 if not user_id:
748 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
749 else:
750 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser), (
751 "Non-superuser tried to call superuser API"
752 )
753 assert not (auth_level == annotations_pb2.AUTH_LEVEL_EDITOR and not is_editor), (
754 "Non-editor tried to call editor API"
755 )
756 assert not (
757 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
758 ), "User is jailed but tried to call non-open/non-jailed API"
761class MockGrpcContext:
762 """
763 Pure mock of grpc.ServicerContext for testing.
764 """
766 def __init__(self):
767 self._initial_metadata = []
768 self._invocation_metadata = []
770 def abort(self, code, details):
771 raise FakeRpcError(code, details)
773 def invocation_metadata(self):
774 return self._invocation_metadata
776 def send_initial_metadata(self, metadata):
777 self._initial_metadata.extend(metadata)
780class FakeChannel:
781 """
782 Mock gRPC channel for testing that orchestrates context creation.
784 This holds test state (token) and creates proper CouchersContext
785 instances when handlers are invoked.
786 """
788 def __init__(self, token=None):
789 self.handlers = {}
790 self._token = token
792 def add_generic_rpc_handlers(self, generic_rpc_handlers):
793 _validate_generic_rpc_handlers(generic_rpc_handlers)
794 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
796 def unary_unary(self, uri, request_serializer, response_deserializer):
797 handler = self.handlers[uri]
799 def fake_handler(request):
800 auth_info: UserAuthInfo | None = None
801 if self._token:
802 auth_info = _try_get_and_update_user_details(
803 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
804 )
806 _check_user_perms(
807 uri,
808 auth_info.user_id if auth_info else None,
809 auth_info.is_jailed if auth_info else None,
810 auth_info.is_editor if auth_info else None,
811 auth_info.is_superuser if auth_info else None,
812 )
814 # Do a full serialization cycle on the request and the
815 # response to catch accidental use of unserializable data.
816 request = handler.request_deserializer(request_serializer(request))
818 with session_scope() as session:
819 mock_grpc_ctx = MockGrpcContext()
821 context = make_interactive_context(
822 grpc_context=mock_grpc_ctx,
823 user_id=auth_info.user_id if auth_info else None,
824 is_api_key=False,
825 token=self._token if auth_info else None,
826 ui_language_preference=auth_info.ui_language_preference if auth_info else None,
827 )
829 response = handler.unary_unary(request, context, session)
831 return response_deserializer(handler.response_serializer(response))
833 return fake_handler
836@contextmanager
837def conversations_session(token):
838 """
839 Create a Conversations API for testing, uses the token for auth
840 """
841 channel = FakeChannel(token)
842 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
843 yield conversations_pb2_grpc.ConversationsStub(channel)
846@contextmanager
847def requests_session(token):
848 """
849 Create a Requests API for testing, uses the token for auth
850 """
851 channel = FakeChannel(token)
852 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
853 yield requests_pb2_grpc.RequestsStub(channel)
856@contextmanager
857def threads_session(token):
858 channel = FakeChannel(token)
859 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
860 yield threads_pb2_grpc.ThreadsStub(channel)
863@contextmanager
864def discussions_session(token):
865 channel = FakeChannel(token)
866 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
867 yield discussions_pb2_grpc.DiscussionsStub(channel)
870@contextmanager
871def donations_session(token):
872 channel = FakeChannel(token)
873 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
874 yield donations_pb2_grpc.DonationsStub(channel)
877@contextmanager
878def real_stripe_session():
879 """
880 Create a Stripe service for testing, using TCP sockets
881 """
882 with futures.ThreadPoolExecutor(1) as executor:
883 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
884 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
885 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
886 server.start()
888 creds = grpc.local_channel_credentials()
890 try:
891 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
892 yield stripe_pb2_grpc.StripeStub(channel)
893 finally:
894 server.stop(None).wait()
897@contextmanager
898def real_iris_session():
899 with futures.ThreadPoolExecutor(1) as executor:
900 server = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
901 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
902 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
903 server.start()
905 creds = grpc.local_channel_credentials()
907 try:
908 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
909 yield iris_pb2_grpc.IrisStub(channel)
910 finally:
911 server.stop(None).wait()
914@contextmanager
915def pages_session(token):
916 channel = FakeChannel(token)
917 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
918 yield pages_pb2_grpc.PagesStub(channel)
921@contextmanager
922def communities_session(token):
923 channel = FakeChannel(token)
924 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
925 yield communities_pb2_grpc.CommunitiesStub(channel)
928@contextmanager
929def groups_session(token):
930 channel = FakeChannel(token)
931 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
932 yield groups_pb2_grpc.GroupsStub(channel)
935@contextmanager
936def blocking_session(token):
937 channel = FakeChannel(token)
938 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
939 yield blocking_pb2_grpc.BlockingStub(channel)
942@contextmanager
943def notifications_session(token):
944 channel = FakeChannel(token)
945 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
946 yield notifications_pb2_grpc.NotificationsStub(channel)
949@contextmanager
950def account_session(token):
951 """
952 Create a Account API for testing, uses the token for auth
953 """
954 channel = FakeChannel(token)
955 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
956 yield account_pb2_grpc.AccountStub(channel)
959@contextmanager
960def search_session(token):
961 """
962 Create a Search API for testing, uses the token for auth
963 """
964 channel = FakeChannel(token)
965 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
966 yield search_pb2_grpc.SearchStub(channel)
969@contextmanager
970def references_session(token):
971 """
972 Create a References API for testing, uses the token for auth
973 """
974 channel = FakeChannel(token)
975 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
976 yield references_pb2_grpc.ReferencesStub(channel)
979@contextmanager
980def galleries_session(token):
981 """
982 Create a Galleries API for testing, uses the token for auth
983 """
984 channel = FakeChannel(token)
985 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
986 yield galleries_pb2_grpc.GalleriesStub(channel)
989@contextmanager
990def reporting_session(token):
991 channel = FakeChannel(token)
992 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
993 yield reporting_pb2_grpc.ReportingStub(channel)
996@contextmanager
997def events_session(token):
998 channel = FakeChannel(token)
999 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
1000 yield events_pb2_grpc.EventsStub(channel)
1003@contextmanager
1004def postal_verification_session(token):
1005 channel = FakeChannel(token)
1006 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
1007 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
1010@contextmanager
1011def bugs_session(token=None):
1012 channel = FakeChannel(token)
1013 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
1014 yield bugs_pb2_grpc.BugsStub(channel)
1017@contextmanager
1018def resources_session():
1019 channel = FakeChannel()
1020 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
1021 yield resources_pb2_grpc.ResourcesStub(channel)
1024@contextmanager
1025def media_session(bearer_token):
1026 """
1027 Create a fresh Media API for testing, uses the bearer token for media auth
1028 """
1029 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
1031 with futures.ThreadPoolExecutor(1) as executor:
1032 server = grpc.server(executor, interceptors=[media_auth_interceptor])
1033 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
1034 servicer = Media()
1035 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
1036 server.start()
1038 call_creds = grpc.access_token_call_credentials(bearer_token)
1039 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
1041 try:
1042 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
1043 yield media_pb2_grpc.MediaStub(channel)
1044 finally:
1045 server.stop(None).wait()
1048@pytest.fixture(scope="class")
1049def testconfig():
1050 prevconfig = config.copy()
1051 config.clear()
1052 config.update(prevconfig)
1054 config["IN_TEST"] = True
1056 config["DEV"] = True
1057 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
1058 config["VERSION"] = "testing_version"
1059 config["BASE_URL"] = "http://localhost:3000"
1060 config["BACKEND_BASE_URL"] = "http://localhost:8888"
1061 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
1062 config["COOKIE_DOMAIN"] = "localhost"
1064 config["ENABLE_SMS"] = False
1065 config["SMS_SENDER_ID"] = "invalid"
1067 config["ENABLE_EMAIL"] = False
1068 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
1069 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
1070 config["NOTIFICATION_PREFIX"] = "[TEST] "
1071 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
1072 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
1073 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
1075 config["ENABLE_DONATIONS"] = False
1076 config["STRIPE_API_KEY"] = ""
1077 config["STRIPE_WEBHOOK_SECRET"] = ""
1078 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
1080 config["ENABLE_STRONG_VERIFICATION"] = False
1081 config["IRIS_ID_PUBKEY"] = ""
1082 config["IRIS_ID_SECRET"] = ""
1083 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
1084 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
1085 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
1086 )
1088 config["ENABLE_POSTAL_VERIFICATION"] = False
1090 config["SMTP_HOST"] = "localhost"
1091 config["SMTP_PORT"] = 587
1092 config["SMTP_USERNAME"] = "username"
1093 config["SMTP_PASSWORD"] = "password"
1095 config["ENABLE_MEDIA"] = True
1096 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
1097 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
1098 )
1099 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
1100 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
1101 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
1103 config["BUG_TOOL_ENABLED"] = False
1104 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
1105 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
1106 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
1108 config["LISTMONK_ENABLED"] = False
1109 config["LISTMONK_BASE_URL"] = "https://localhost"
1110 config["LISTMONK_API_USERNAME"] = "..."
1111 config["LISTMONK_API_KEY"] = "..."
1112 config["LISTMONK_LIST_ID"] = 3
1114 config["PUSH_NOTIFICATIONS_ENABLED"] = True
1115 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
1116 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
1118 config["ACTIVENESS_PROBES_ENABLED"] = True
1120 config["RECAPTHCA_ENABLED"] = False
1121 config["RECAPTHCA_PROJECT_ID"] = "..."
1122 config["RECAPTHCA_API_KEY"] = "..."
1123 config["RECAPTHCA_SITE_KEY"] = "..."
1125 config["EXPERIMENTATION_ENABLED"] = False
1126 config["EXPERIMENTATION_PASS_ALL_GATES"] = True
1127 config["STATSIG_SERVER_SECRET_KEY"] = ""
1128 config["STATSIG_ENVIRONMENT"] = "testing"
1130 # Moderation auto-approval deadline - 0 disables, set in tests that need it
1131 config["MODERATION_AUTO_APPROVE_DEADLINE_SECONDS"] = 0
1132 # Bot user ID for automated moderation - will be set to a real user in tests that need it
1133 config["MODERATION_BOT_USER_ID"] = 1
1135 # Dev APIs disabled by default in tests
1136 config["ENABLE_DEV_APIS"] = False
1138 yield None
1140 config.clear()
1141 config.update(prevconfig)
1144def run_migration_test():
1145 return os.environ.get("RUN_MIGRATION_TEST", "false").lower() == "true"
1148@pytest.fixture
1149def fast_passwords():
1150 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
1151 # make this fast by removing the hashing step
1153 def fast_hash(password: bytes) -> bytes:
1154 return b"fake hash:" + password
1156 def fast_verify(hashed: bytes, password: bytes) -> bool:
1157 return hashed == fast_hash(password)
1159 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
1160 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
1161 yield
1164def process_jobs():
1165 while process_job():
1166 pass
1169@contextmanager
1170def mock_notification_email():
1171 with patch("couchers.email._queue_email") as mock:
1172 yield mock
1173 process_jobs()
1176@dataclass
1177class EmailData:
1178 sender_name: str
1179 sender_email: str
1180 recipient: str
1181 subject: str
1182 plain: str
1183 html: str
1184 source_data: str
1185 list_unsubscribe_header: str
1188def email_fields(mock, call_ix=0):
1189 _, kw = mock.call_args_list[call_ix]
1190 return EmailData(
1191 sender_name=kw.get("sender_name"),
1192 sender_email=kw.get("sender_email"),
1193 recipient=kw.get("recipient"),
1194 subject=kw.get("subject"),
1195 plain=kw.get("plain"),
1196 html=kw.get("html"),
1197 source_data=kw.get("source_data"),
1198 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
1199 )
1202class Push:
1203 """
1204 This allows nice access to the push info via e.g. push.title instead of push["title"]
1205 """
1207 def __init__(self, kwargs):
1208 self.kwargs = kwargs
1210 def __getattr__(self, attr):
1211 try:
1212 return self.kwargs[attr]
1213 except KeyError:
1214 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
1216 def __repr__(self):
1217 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
1218 return f"Push({kwargs_disp})"
1221class PushCollector:
1222 def __init__(self):
1223 # pairs of (user_id, push)
1224 self.pushes = []
1226 def by_user(self, user_id):
1227 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
1229 def push_to_user(self, session, user_id, **kwargs):
1230 self.pushes.append((user_id, Push(kwargs=kwargs)))
1232 def assert_user_has_count(self, user_id, count):
1233 assert len(self.by_user(user_id)) == count
1235 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
1236 push = self.by_user(user_id)[ix]
1237 for kwarg in kwargs:
1238 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
1239 assert push.kwargs[kwarg] == kwargs[kwarg], (
1240 f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
1241 )
1243 def assert_user_has_single_matching(self, user_id, **kwargs):
1244 self.assert_user_has_count(user_id, 1)
1245 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
1248@pytest.fixture
1249def push_collector():
1250 """
1251 See test_SendTestPushNotification for an example on how to use this fixture
1252 """
1253 collector = PushCollector()
1255 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
1256 yield collector
1259class Moderator:
1260 """
1261 A test fixture that provides a moderator user and methods to exercise the moderation API.
1263 Usage:
1264 def test_example(db, moderator):
1265 user, token = generate_user()
1266 # ... create a host request ...
1267 moderator.approve_host_request(host_request_id)
1268 """
1270 def __init__(self, user: User, token: str):
1271 self.user = user
1272 self.token = token
1274 def approve_host_request(self, host_request_id: int, reason: str = "Test approval") -> None:
1275 """
1276 Approve a host request using the moderation API.
1278 Args:
1279 host_request_id: The conversation_id of the host request
1280 reason: Optional reason for approval
1281 """
1282 with real_moderation_session(self.token) as api:
1283 state_res = api.GetModerationState(
1284 moderation_pb2.GetModerationStateReq(
1285 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
1286 object_id=host_request_id,
1287 )
1288 )
1289 api.ModerateContent(
1290 moderation_pb2.ModerateContentReq(
1291 moderation_state_id=state_res.moderation_state.moderation_state_id,
1292 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1293 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1294 reason=reason,
1295 )
1296 )
1298 def approve_group_chat(self, group_chat_id: int, reason: str = "Test approval") -> None:
1299 """
1300 Approve a group chat using the moderation API.
1302 Args:
1303 group_chat_id: The conversation_id of the group chat
1304 reason: Optional reason for approval
1305 """
1306 with real_moderation_session(self.token) as api:
1307 state_res = api.GetModerationState(
1308 moderation_pb2.GetModerationStateReq(
1309 object_type=moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
1310 object_id=group_chat_id,
1311 )
1312 )
1313 api.ModerateContent(
1314 moderation_pb2.ModerateContentReq(
1315 moderation_state_id=state_res.moderation_state.moderation_state_id,
1316 action=moderation_pb2.MODERATION_ACTION_APPROVE,
1317 visibility=moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
1318 reason=reason,
1319 )
1320 )
1323@pytest.fixture
1324def moderator():
1325 """
1326 Creates a moderator (superuser) and provides methods to exercise the moderation API.
1328 Usage:
1329 def test_example(db, moderator):
1330 # ... create a host request ...
1331 moderator.approve_host_request(host_request_id)
1332 """
1333 user, token = generate_user(is_superuser=True)
1334 yield Moderator(user, token)