Coverage for src/tests/test_fixtures.py: 98%
501 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-10-21 08:09 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-10-21 08:09 +0000
1import os
2from concurrent import futures
3from contextlib import contextmanager
4from dataclasses import dataclass
5from datetime import date
6from pathlib import Path
7from unittest.mock import patch
9import grpc
10import pytest
11from sqlalchemy.orm import close_all_sessions
12from sqlalchemy.sql import or_, text
14from couchers.config import config
15from couchers.constants import GUIDELINES_VERSION, TOS_VERSION
16from couchers.crypto import random_hex
17from couchers.db import _get_base_engine, session_scope
18from couchers.descriptor_pool import get_descriptor_pool
19from couchers.interceptors import AuthValidatorInterceptor, SessionInterceptor, _try_get_and_update_user_details
20from couchers.jobs.worker import process_job
21from couchers.models import (
22 Base,
23 FriendRelationship,
24 FriendStatus,
25 HostingStatus,
26 Language,
27 LanguageAbility,
28 LanguageFluency,
29 MeetupStatus,
30 Region,
31 RegionLived,
32 RegionVisited,
33 Upload,
34 User,
35 UserBlock,
36 UserSession,
37)
38from couchers.servicers.account import Account, Iris
39from couchers.servicers.admin import Admin
40from couchers.servicers.api import API
41from couchers.servicers.auth import Auth, create_session
42from couchers.servicers.blocking import Blocking
43from couchers.servicers.bugs import Bugs
44from couchers.servicers.communities import Communities
45from couchers.servicers.conversations import Conversations
46from couchers.servicers.discussions import Discussions
47from couchers.servicers.donations import Donations, Stripe
48from couchers.servicers.events import Events
49from couchers.servicers.groups import Groups
50from couchers.servicers.jail import Jail
51from couchers.servicers.media import Media, get_media_auth_interceptor
52from couchers.servicers.notifications import Notifications
53from couchers.servicers.pages import Pages
54from couchers.servicers.references import References
55from couchers.servicers.reporting import Reporting
56from couchers.servicers.requests import Requests
57from couchers.servicers.resources import Resources
58from couchers.servicers.search import Search
59from couchers.servicers.threads import Threads
60from couchers.sql import couchers_select as select
61from couchers.utils import create_coordinate, now
62from proto import (
63 account_pb2_grpc,
64 admin_pb2_grpc,
65 annotations_pb2,
66 api_pb2_grpc,
67 auth_pb2_grpc,
68 blocking_pb2_grpc,
69 bugs_pb2_grpc,
70 communities_pb2_grpc,
71 conversations_pb2_grpc,
72 discussions_pb2_grpc,
73 donations_pb2_grpc,
74 events_pb2_grpc,
75 groups_pb2_grpc,
76 iris_pb2_grpc,
77 jail_pb2_grpc,
78 media_pb2_grpc,
79 notifications_pb2_grpc,
80 pages_pb2_grpc,
81 references_pb2_grpc,
82 reporting_pb2_grpc,
83 requests_pb2_grpc,
84 resources_pb2_grpc,
85 search_pb2_grpc,
86 stripe_pb2_grpc,
87 threads_pb2_grpc,
88)
91def drop_all():
92 """drop everything currently in the database"""
93 with session_scope() as session:
94 # postgis is required for all the Geographic Information System (GIS) stuff
95 # pg_trgm is required for trigram based search
96 # btree_gist is required for gist-based exclusion constraints
97 session.execute(
98 text(
99 "DROP SCHEMA IF EXISTS public CASCADE;"
100 "DROP SCHEMA IF EXISTS logging CASCADE;"
101 "DROP EXTENSION IF EXISTS postgis CASCADE;"
102 "CREATE SCHEMA public;"
103 "CREATE SCHEMA logging;"
104 "CREATE EXTENSION postgis;"
105 "CREATE EXTENSION pg_trgm;"
106 "CREATE EXTENSION btree_gist;"
107 )
108 )
110 # this resets the database connection pool, which caches some stuff postgres-side about objects and will otherwise
111 # sometimes error out with "ERROR: no spatial operator found for 'st_contains': opfamily 203699 type 203585"
112 # and similar errors
113 _get_base_engine().dispose()
115 close_all_sessions()
118def create_schema_from_models():
119 """
120 Create everything from the current models, not incrementally
121 through migrations.
122 """
124 # create the slugify function
125 functions = Path(__file__).parent / "slugify.sql"
126 with open(functions) as f, session_scope() as session:
127 session.execute(text(f.read()))
129 Base.metadata.create_all(_get_base_engine())
132def populate_testing_resources(session):
133 """
134 Testing version of couchers.resources.copy_resources_to_database
135 """
136 regions = [
137 ("AUS", "Australia"),
138 ("CAN", "Canada"),
139 ("CHE", "Switzerland"),
140 ("CUB", "Cuba"),
141 ("CXR", "Christmas Island"),
142 ("CZE", "Czechia"),
143 ("DEU", "Germany"),
144 ("EGY", "Egypt"),
145 ("ESP", "Spain"),
146 ("EST", "Estonia"),
147 ("FIN", "Finland"),
148 ("FRA", "France"),
149 ("GBR", "United Kingdom"),
150 ("GEO", "Georgia"),
151 ("GHA", "Ghana"),
152 ("GRC", "Greece"),
153 ("HKG", "Hong Kong"),
154 ("IRL", "Ireland"),
155 ("ISR", "Israel"),
156 ("ITA", "Italy"),
157 ("JPN", "Japan"),
158 ("LAO", "Laos"),
159 ("MEX", "Mexico"),
160 ("MMR", "Myanmar"),
161 ("NAM", "Namibia"),
162 ("NLD", "Netherlands"),
163 ("NZL", "New Zealand"),
164 ("POL", "Poland"),
165 ("PRK", "North Korea"),
166 ("REU", "Réunion"),
167 ("SGP", "Singapore"),
168 ("SWE", "Sweden"),
169 ("THA", "Thailand"),
170 ("TUR", "Turkey"),
171 ("TWN", "Taiwan"),
172 ("USA", "United States"),
173 ("VNM", "Vietnam"),
174 ]
176 languages = [
177 ("arb", "Arabic (Standard)"),
178 ("deu", "German"),
179 ("eng", "English"),
180 ("fin", "Finnish"),
181 ("fra", "French"),
182 ("heb", "Hebrew"),
183 ("hun", "Hungarian"),
184 ("jpn", "Japanese"),
185 ("pol", "Polish"),
186 ("swe", "Swedish"),
187 ("cmn", "Chinese (Mandarin)"),
188 ]
190 with open(Path(__file__).parent / ".." / ".." / "resources" / "timezone_areas.sql-fake", "r") as f:
191 tz_sql = f.read()
193 for code, name in regions:
194 session.add(Region(code=code, name=name))
196 for code, name in languages:
197 session.add(Language(code=code, name=name))
199 session.execute(text(tz_sql))
202def recreate_database():
203 """
204 Connect to a running Postgres database, build it using metadata.create_all()
205 """
207 # running in non-UTC catches some timezone errors
208 os.environ["TZ"] = "America/New_York"
210 # drop everything currently in the database
211 drop_all()
213 # create everything from the current models, not incrementally through migrations
214 create_schema_from_models()
216 with session_scope() as session:
217 populate_testing_resources(session)
220@pytest.fixture()
221def db():
222 """
223 Pytest fixture to connect to a running Postgres database and build it using metadata.create_all()
224 """
226 recreate_database()
229def generate_user(*, delete_user=False, complete_profile=True, **kwargs):
230 """
231 Create a new user, return session token
233 The user is detached from any session, and you can access its static attributes, but you can't modify it
235 Use this most of the time
236 """
237 auth = Auth()
239 with session_scope() as session:
240 # default args
241 username = "test_user_" + random_hex(16)
242 user_opts = {
243 "username": username,
244 "email": f"{username}@dev.couchers.org",
245 # password is just 'password'
246 # this is hardcoded because the password is slow to hash (so would slow down tests otherwise)
247 "hashed_password": b"$argon2id$v=19$m=65536,t=2,p=1$4cjGg1bRaZ10k+7XbIDmFg$tZG7JaLrkfyfO7cS233ocq7P8rf3znXR7SAfUt34kJg",
248 "name": username.capitalize(),
249 "hosting_status": HostingStatus.cant_host,
250 "meetup_status": MeetupStatus.open_to_meetup,
251 "city": "Testing city",
252 "hometown": "Test hometown",
253 "community_standing": 0.5,
254 "birthdate": date(year=2000, month=1, day=1),
255 "gender": "N/A",
256 "pronouns": "",
257 "occupation": "Tester",
258 "education": "UST(esting)",
259 "about_me": "I test things",
260 "my_travels": "Places",
261 "things_i_like": "Code",
262 "about_place": "My place has a lot of testing paraphenelia",
263 "additional_information": "I can be a bit testy",
264 # you need to make sure to update this logic to make sure the user is jailed/not on request
265 "accepted_tos": TOS_VERSION,
266 "accepted_community_guidelines": GUIDELINES_VERSION,
267 "geom": create_coordinate(40.7108, -73.9740),
268 "geom_radius": 100,
269 "onboarding_emails_sent": 1,
270 "last_onboarding_email_sent": now(),
271 "has_donated": True,
272 }
274 for key, value in kwargs.items():
275 user_opts[key] = value
277 user = User(**user_opts)
278 session.add(user)
279 session.flush()
281 session.add(RegionVisited(user_id=user.id, region_code="CHE"))
282 session.add(RegionVisited(user_id=user.id, region_code="REU"))
283 session.add(RegionVisited(user_id=user.id, region_code="FIN"))
285 session.add(RegionLived(user_id=user.id, region_code="ESP"))
286 session.add(RegionLived(user_id=user.id, region_code="FRA"))
287 session.add(RegionLived(user_id=user.id, region_code="EST"))
289 session.add(LanguageAbility(user_id=user.id, language_code="fin", fluency=LanguageFluency.fluent))
290 session.add(LanguageAbility(user_id=user.id, language_code="fra", fluency=LanguageFluency.beginner))
292 # this expires the user, so now it's "dirty"
293 session.commit()
295 class _DummyContext:
296 def invocation_metadata(self):
297 return {}
299 token, _ = create_session(_DummyContext(), session, user, False, set_cookie=False)
301 # deleted user aborts session creation, hence this follows and necessitates a second commit
302 if delete_user:
303 user.is_deleted = True
305 user.recommendation_score = 1e10 - user.id
307 if complete_profile:
308 key = random_hex(32)
309 filename = random_hex(32) + ".jpg"
310 session.add(
311 Upload(
312 key=key,
313 filename=filename,
314 creator_user_id=user.id,
315 )
316 )
317 session.flush()
318 user.avatar_key = key
319 user.about_me = "I have a complete profile!\n" * 20
321 session.commit()
323 assert user.has_completed_profile == complete_profile
325 # refresh it, undoes the expiry
326 session.refresh(user)
328 # this loads the user's timezone info which is lazy loaded, otherwise we'll get issues if we try to refer to it
329 user.timezone # noqa: B018
331 # allows detaches the user from the session, allowing its use outside this session
332 session.expunge(user)
334 return user, token
337def get_user_id_and_token(session, username):
338 user_id = session.execute(select(User).where(User.username == username)).scalar_one().id
339 token = session.execute(select(UserSession).where(UserSession.user_id == user_id)).scalar_one().token
340 return user_id, token
343def make_friends(user1, user2):
344 with session_scope() as session:
345 friend_relationship = FriendRelationship(
346 from_user_id=user1.id,
347 to_user_id=user2.id,
348 status=FriendStatus.accepted,
349 )
350 session.add(friend_relationship)
353def make_user_block(user1, user2):
354 with session_scope() as session:
355 user_block = UserBlock(
356 blocking_user_id=user1.id,
357 blocked_user_id=user2.id,
358 )
359 session.add(user_block)
360 session.commit()
363def make_user_invisible(user_id):
364 with session_scope() as session:
365 session.execute(select(User).where(User.id == user_id)).scalar_one().is_banned = True
368# This doubles as get_FriendRequest, since a friend request is just a pending friend relationship
369def get_friend_relationship(user1, user2):
370 with session_scope() as session:
371 friend_relationship = session.execute(
372 select(FriendRelationship).where(
373 or_(
374 (FriendRelationship.from_user_id == user1.id and FriendRelationship.to_user_id == user2.id),
375 (FriendRelationship.from_user_id == user2.id and FriendRelationship.to_user_id == user1.id),
376 )
377 )
378 ).scalar_one_or_none()
380 session.expunge(friend_relationship)
381 return friend_relationship
384class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
385 """
386 Injects the right `cookie: couchers-sesh=...` header into the metadata
387 """
389 def __init__(self, token):
390 self.token = token
392 def __call__(self, context, callback):
393 callback((("cookie", f"couchers-sesh={self.token}"),), None)
396@contextmanager
397def auth_api_session(grpc_channel_options=()):
398 """
399 Create an Auth API for testing
401 This needs to use the real server since it plays around with headers
402 """
403 with futures.ThreadPoolExecutor(1) as executor:
404 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
405 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
406 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
407 server.start()
409 try:
410 with grpc.secure_channel(
411 f"localhost:{port}", grpc.local_channel_credentials(), options=grpc_channel_options
412 ) as channel:
414 class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
415 def __init__(self):
416 self.latest_headers = {}
418 def intercept_unary_unary(self, continuation, client_call_details, request):
419 call = continuation(client_call_details, request)
420 self.latest_headers = dict(call.initial_metadata())
421 self.latest_header_raw = call.initial_metadata()
422 return call
424 metadata_interceptor = _MetadataKeeperInterceptor()
425 channel = grpc.intercept_channel(channel, metadata_interceptor)
426 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
427 finally:
428 server.stop(None).wait()
431@contextmanager
432def api_session(token):
433 """
434 Create an API for testing, uses the token for auth
435 """
436 channel = fake_channel(token)
437 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
438 yield api_pb2_grpc.APIStub(channel)
441@contextmanager
442def real_api_session(token):
443 """
444 Create an API for testing, using TCP sockets, uses the token for auth
445 """
446 with futures.ThreadPoolExecutor(1) as executor:
447 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
448 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
449 api_pb2_grpc.add_APIServicer_to_server(API(), server)
450 server.start()
452 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
453 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
455 try:
456 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
457 yield api_pb2_grpc.APIStub(channel)
458 finally:
459 server.stop(None).wait()
462@contextmanager
463def real_admin_session(token):
464 """
465 Create a Admin service for testing, using TCP sockets, uses the token for auth
466 """
467 with futures.ThreadPoolExecutor(1) as executor:
468 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
469 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
470 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
471 server.start()
473 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
474 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
476 try:
477 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
478 yield admin_pb2_grpc.AdminStub(channel)
479 finally:
480 server.stop(None).wait()
483@contextmanager
484def real_account_session(token):
485 """
486 Create a Account service for testing, using TCP sockets, uses the token for auth
487 """
488 with futures.ThreadPoolExecutor(1) as executor:
489 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
490 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
491 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
492 server.start()
494 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
495 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
497 try:
498 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
499 yield account_pb2_grpc.AccountStub(channel)
500 finally:
501 server.stop(None).wait()
504@contextmanager
505def real_jail_session(token):
506 """
507 Create a Jail service for testing, using TCP sockets, uses the token for auth
508 """
509 with futures.ThreadPoolExecutor(1) as executor:
510 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
511 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
512 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
513 server.start()
515 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
516 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
518 try:
519 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
520 yield jail_pb2_grpc.JailStub(channel)
521 finally:
522 server.stop(None).wait()
525class FakeRpcError(grpc.RpcError):
526 def __init__(self, code, details):
527 self._code = code
528 self._details = details
530 def code(self):
531 return self._code
533 def details(self):
534 return self._details
537def _check_user_perms(method, user_id, is_jailed, is_superuser, token_expiry):
538 # method is of the form "/org.couchers.api.core.API/GetUser"
539 _, service_name, method_name = method.split("/")
541 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
542 auth_level = service_options.Extensions[annotations_pb2.auth_level]
543 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
544 assert auth_level in [
545 annotations_pb2.AUTH_LEVEL_OPEN,
546 annotations_pb2.AUTH_LEVEL_JAILED,
547 annotations_pb2.AUTH_LEVEL_SECURE,
548 annotations_pb2.AUTH_LEVEL_ADMIN,
549 ]
551 if not user_id:
552 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
553 else:
554 assert not (
555 auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not is_superuser
556 ), "Non-superuser tried to call superuser API"
557 assert not (
558 is_jailed and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
559 ), "User is jailed but tried to call non-open/non-jailed API"
562class FakeChannel:
563 def __init__(self, user_id=None, is_jailed=None, is_superuser=None, token_expiry=None):
564 self.handlers = {}
565 self.user_id = user_id
566 self._is_jailed = is_jailed
567 self._is_superuser = is_superuser
568 self._token_expiry = token_expiry
570 def abort(self, code, details):
571 raise FakeRpcError(code, details)
573 def add_generic_rpc_handlers(self, generic_rpc_handlers):
574 from grpc._server import _validate_generic_rpc_handlers
576 _validate_generic_rpc_handlers(generic_rpc_handlers)
578 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
580 def unary_unary(self, uri, request_serializer, response_deserializer):
581 handler = self.handlers[uri]
583 _check_user_perms(uri, self.user_id, self._is_jailed, self._is_superuser, self._token_expiry)
585 def fake_handler(request):
586 # Do a full serialization cycle on the request and the
587 # response to catch accidental use of unserializable data.
588 request = handler.request_deserializer(request_serializer(request))
590 with session_scope() as session:
591 response = handler.unary_unary(request, self, session)
593 return response_deserializer(handler.response_serializer(response))
595 return fake_handler
598def fake_channel(token=None):
599 if token:
600 user_id, is_jailed, is_superuser, token_expiry = _try_get_and_update_user_details(
601 token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
602 )
603 return FakeChannel(user_id=user_id, is_jailed=is_jailed, is_superuser=is_superuser, token_expiry=token_expiry)
604 return FakeChannel()
607@contextmanager
608def conversations_session(token):
609 """
610 Create a Conversations API for testing, uses the token for auth
611 """
612 channel = fake_channel(token)
613 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
614 yield conversations_pb2_grpc.ConversationsStub(channel)
617@contextmanager
618def requests_session(token):
619 """
620 Create a Requests API for testing, uses the token for auth
621 """
622 channel = fake_channel(token)
623 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
624 yield requests_pb2_grpc.RequestsStub(channel)
627@contextmanager
628def threads_session(token):
629 channel = fake_channel(token)
630 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
631 yield threads_pb2_grpc.ThreadsStub(channel)
634@contextmanager
635def discussions_session(token):
636 channel = fake_channel(token)
637 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
638 yield discussions_pb2_grpc.DiscussionsStub(channel)
641@contextmanager
642def donations_session(token):
643 channel = fake_channel(token)
644 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
645 yield donations_pb2_grpc.DonationsStub(channel)
648@contextmanager
649def real_stripe_session():
650 """
651 Create a Stripe service for testing, using TCP sockets
652 """
653 with futures.ThreadPoolExecutor(1) as executor:
654 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
655 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
656 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
657 server.start()
659 creds = grpc.local_channel_credentials()
661 try:
662 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
663 yield stripe_pb2_grpc.StripeStub(channel)
664 finally:
665 server.stop(None).wait()
668@contextmanager
669def real_iris_session():
670 with futures.ThreadPoolExecutor(1) as executor:
671 server = grpc.server(executor, interceptors=[AuthValidatorInterceptor(), SessionInterceptor()])
672 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
673 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
674 server.start()
676 creds = grpc.local_channel_credentials()
678 try:
679 with grpc.secure_channel(f"localhost:{port}", creds) as channel:
680 yield iris_pb2_grpc.IrisStub(channel)
681 finally:
682 server.stop(None).wait()
685@contextmanager
686def pages_session(token):
687 channel = fake_channel(token)
688 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
689 yield pages_pb2_grpc.PagesStub(channel)
692@contextmanager
693def communities_session(token):
694 channel = fake_channel(token)
695 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
696 yield communities_pb2_grpc.CommunitiesStub(channel)
699@contextmanager
700def groups_session(token):
701 channel = fake_channel(token)
702 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
703 yield groups_pb2_grpc.GroupsStub(channel)
706@contextmanager
707def blocking_session(token):
708 channel = fake_channel(token)
709 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
710 yield blocking_pb2_grpc.BlockingStub(channel)
713@contextmanager
714def notifications_session(token):
715 channel = fake_channel(token)
716 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
717 yield notifications_pb2_grpc.NotificationsStub(channel)
720@contextmanager
721def account_session(token):
722 """
723 Create a Account API for testing, uses the token for auth
724 """
725 channel = fake_channel(token)
726 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
727 yield account_pb2_grpc.AccountStub(channel)
730@contextmanager
731def search_session(token):
732 """
733 Create a Search API for testing, uses the token for auth
734 """
735 channel = fake_channel(token)
736 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
737 yield search_pb2_grpc.SearchStub(channel)
740@contextmanager
741def references_session(token):
742 """
743 Create a References API for testing, uses the token for auth
744 """
745 channel = fake_channel(token)
746 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
747 yield references_pb2_grpc.ReferencesStub(channel)
750@contextmanager
751def reporting_session(token):
752 channel = fake_channel(token)
753 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
754 yield reporting_pb2_grpc.ReportingStub(channel)
757@contextmanager
758def events_session(token):
759 channel = fake_channel(token)
760 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
761 yield events_pb2_grpc.EventsStub(channel)
764@contextmanager
765def bugs_session(token=None):
766 channel = fake_channel(token)
767 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
768 yield bugs_pb2_grpc.BugsStub(channel)
771@contextmanager
772def resources_session():
773 channel = fake_channel()
774 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
775 yield resources_pb2_grpc.ResourcesStub(channel)
778@contextmanager
779def media_session(bearer_token):
780 """
781 Create a fresh Media API for testing, uses the bearer token for media auth
782 """
783 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
785 with futures.ThreadPoolExecutor(1) as executor:
786 server = grpc.server(executor, interceptors=[media_auth_interceptor, SessionInterceptor()])
787 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
788 servicer = Media()
789 media_pb2_grpc.add_MediaServicer_to_server(servicer, server)
790 server.start()
792 call_creds = grpc.access_token_call_credentials(bearer_token)
793 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
795 try:
796 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
797 yield media_pb2_grpc.MediaStub(channel)
798 finally:
799 server.stop(None).wait()
802@pytest.fixture(scope="class")
803def testconfig():
804 prevconfig = config.copy()
805 config.clear()
806 config.update(prevconfig)
808 config["IN_TEST"] = True
810 config["DEV"] = True
811 config["SECRET"] = bytes.fromhex("448697d3886aec65830a1ea1497cdf804981e0c260d2f812cf2787c4ed1a262b")
812 config["VERSION"] = "testing_version"
813 config["BASE_URL"] = "http://localhost:3000"
814 config["BACKEND_BASE_URL"] = "http://localhost:8888"
815 config["CONSOLE_BASE_URL"] = "http://localhost:8888"
816 config["COOKIE_DOMAIN"] = "localhost"
818 config["ENABLE_SMS"] = False
819 config["SMS_SENDER_ID"] = "invalid"
821 config["ENABLE_EMAIL"] = False
822 config["NOTIFICATION_EMAIL_SENDER"] = "Couchers.org"
823 config["NOTIFICATION_EMAIL_ADDRESS"] = "notify@couchers.org.invalid"
824 config["NOTIFICATION_EMAIL_PREFIX"] = "[TEST] "
825 config["REPORTS_EMAIL_RECIPIENT"] = "reports@couchers.org.invalid"
826 config["CONTRIBUTOR_FORM_EMAIL_RECIPIENT"] = "forms@couchers.org.invalid"
827 config["MODS_EMAIL_RECIPIENT"] = "mods@couchers.org.invalid"
829 config["ENABLE_DONATIONS"] = False
830 config["STRIPE_API_KEY"] = ""
831 config["STRIPE_WEBHOOK_SECRET"] = ""
832 config["STRIPE_RECURRING_PRODUCT_ID"] = ""
834 config["ENABLE_STRONG_VERIFICATION"] = False
835 config["IRIS_ID_PUBKEY"] = ""
836 config["IRIS_ID_SECRET"] = ""
837 # corresponds to private key e6c2fbf3756b387bc09a458a7b85935718ef3eb1c2777ef41d335c9f6c0ab272
838 config["VERIFICATION_DATA_PUBLIC_KEY"] = bytes.fromhex(
839 "dd740a2b2a35bf05041a28257ea439b30f76f056f3698000b71e6470cd82275f"
840 )
842 config["SMTP_HOST"] = "localhost"
843 config["SMTP_PORT"] = 587
844 config["SMTP_USERNAME"] = "username"
845 config["SMTP_PASSWORD"] = "password"
847 config["ENABLE_MEDIA"] = True
848 config["MEDIA_SERVER_SECRET_KEY"] = bytes.fromhex(
849 "91e29bbacc74fa7e23c5d5f34cca5015cb896e338a620003de94a502a461f4bc"
850 )
851 config["MEDIA_SERVER_BEARER_TOKEN"] = "c02d383897d3b82774ced09c9e17802164c37e7e105d8927553697bf4550e91e"
852 config["MEDIA_SERVER_BASE_URL"] = "http://localhost:5001"
853 config["MEDIA_SERVER_UPLOAD_BASE_URL"] = "http://localhost:5001"
855 config["BUG_TOOL_ENABLED"] = False
856 config["BUG_TOOL_GITHUB_REPO"] = "org/repo"
857 config["BUG_TOOL_GITHUB_USERNAME"] = "user"
858 config["BUG_TOOL_GITHUB_TOKEN"] = "token"
860 config["LISTMONK_ENABLED"] = False
861 config["LISTMONK_BASE_URL"] = "https://localhost"
862 config["LISTMONK_API_KEY"] = "..."
863 config["LISTMONK_LIST_UUID"] = "..."
865 config["PUSH_NOTIFICATIONS_ENABLED"] = True
866 config["PUSH_NOTIFICATIONS_VAPID_PRIVATE_KEY"] = "uI1DCR4G1AdlmMlPfRLemMxrz9f3h4kvjfnI8K9WsVI"
867 config["PUSH_NOTIFICATIONS_VAPID_SUBJECT"] = "mailto:testing@couchers.org.invalid"
869 yield None
871 config.clear()
872 config.update(prevconfig)
875@pytest.fixture
876def fast_passwords():
877 # password hashing, by design, takes a lot of time, which slows down the tests. here we jump through some hoops to
878 # make this fast by removing the hashing step
880 def fast_hash(password: bytes) -> bytes:
881 return b"fake hash:" + password
883 def fast_verify(hashed: bytes, password: bytes) -> bool:
884 return hashed == fast_hash(password)
886 with patch("couchers.crypto.nacl.pwhash.verify", fast_verify):
887 with patch("couchers.crypto.nacl.pwhash.str", fast_hash):
888 yield
891def process_jobs():
892 while process_job():
893 pass
896@contextmanager
897def mock_notification_email():
898 with patch("couchers.email._queue_email") as mock:
899 yield mock
900 process_jobs()
903@dataclass
904class EmailData:
905 sender_name: str
906 sender_email: str
907 recipient: str
908 subject: str
909 plain: str
910 html: str
911 source_data: str
912 list_unsubscribe_header: str
915def email_fields(mock, call_ix=0):
916 _, kw = mock.call_args_list[call_ix]
917 return EmailData(
918 sender_name=kw.get("sender_name"),
919 sender_email=kw.get("sender_email"),
920 recipient=kw.get("recipient"),
921 subject=kw.get("subject"),
922 plain=kw.get("plain"),
923 html=kw.get("html"),
924 source_data=kw.get("source_data"),
925 list_unsubscribe_header=kw.get("list_unsubscribe_header"),
926 )
929@pytest.fixture
930def push_collector():
931 """
932 See test_SendTestPushNotification for an example on how to use this fixture
933 """
935 class Push:
936 """
937 This allows nice access to the push info via e.g. push.title instead of push["title"]
938 """
940 def __init__(self, kwargs):
941 self.kwargs = kwargs
943 def __getattr__(self, attr):
944 try:
945 return self.kwargs[attr]
946 except KeyError:
947 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attr}'") from None
949 def __repr__(self):
950 kwargs_disp = ", ".join(f"'{key}'='{val}'" for key, val in self.kwargs.items())
951 return f"Push({kwargs_disp})"
953 class PushCollector:
954 def __init__(self):
955 # pairs of (user_id, push)
956 self.pushes = []
958 def by_user(self, user_id):
959 return [kwargs for uid, kwargs in self.pushes if uid == user_id]
961 def push_to_user(self, session, user_id, **kwargs):
962 self.pushes.append((user_id, Push(kwargs=kwargs)))
964 def assert_user_has_count(self, user_id, count):
965 assert len(self.by_user(user_id)) == count
967 def assert_user_push_matches_fields(self, user_id, ix=0, **kwargs):
968 push = self.by_user(user_id)[ix]
969 for kwarg in kwargs:
970 assert kwarg in push.kwargs, f"Push notification {user_id=}, {ix=} missing field '{kwarg}'"
971 assert (
972 push.kwargs[kwarg] == kwargs[kwarg]
973 ), f"Push notification {user_id=}, {ix=} mismatch in field '{kwarg}', expected '{kwargs[kwarg]}' but got '{push.kwargs[kwarg]}'"
975 def assert_user_has_single_matching(self, user_id, **kwargs):
976 self.assert_user_has_count(user_id, 1)
977 self.assert_user_push_matches_fields(user_id, ix=0, **kwargs)
979 collector = PushCollector()
981 with patch("couchers.notifications.push._push_to_user", collector.push_to_user):
982 yield collector