Coverage for app / backend / src / tests / fixtures / sessions.py: 99%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1from collections.abc import Generator
2from concurrent import futures
3from contextlib import contextmanager
4from typing import Any, NoReturn
5from zoneinfo import ZoneInfo
7import grpc
8from grpc._server import _validate_generic_rpc_handlers
10from couchers.context import make_interactive_context
11from couchers.db import session_scope
12from couchers.descriptor_pool import get_descriptor_pool
13from couchers.i18n import LocalizationContext
14from couchers.i18n.locales import DEFAULT_LOCALE
15from couchers.interceptors import (
16 CouchersMiddlewareInterceptor,
17 _try_get_and_update_user_details,
18 check_permissions,
19 find_auth_level,
20)
21from couchers.proto import (
22 account_pb2_grpc,
23 admin_pb2_grpc,
24 api_pb2_grpc,
25 auth_pb2_grpc,
26 blocking_pb2_grpc,
27 bugs_pb2_grpc,
28 communities_pb2_grpc,
29 conversations_pb2_grpc,
30 discussions_pb2_grpc,
31 donations_pb2_grpc,
32 editor_pb2_grpc,
33 events_pb2_grpc,
34 galleries_pb2_grpc,
35 gis_pb2_grpc,
36 groups_pb2_grpc,
37 iris_pb2_grpc,
38 jail_pb2_grpc,
39 media_pb2_grpc,
40 moderation_pb2_grpc,
41 notifications_pb2_grpc,
42 pages_pb2_grpc,
43 postal_verification_pb2_grpc,
44 public_pb2_grpc,
45 public_trips_pb2_grpc,
46 references_pb2_grpc,
47 reporting_pb2_grpc,
48 requests_pb2_grpc,
49 resources_pb2_grpc,
50 search_pb2_grpc,
51 stripe_pb2_grpc,
52 threads_pb2_grpc,
53)
54from couchers.servicers.account import Account, Iris
55from couchers.servicers.admin import Admin
56from couchers.servicers.api import API
57from couchers.servicers.auth import Auth
58from couchers.servicers.blocking import Blocking
59from couchers.servicers.bugs import Bugs
60from couchers.servicers.communities import Communities
61from couchers.servicers.conversations import Conversations
62from couchers.servicers.discussions import Discussions
63from couchers.servicers.donations import Donations, Stripe
64from couchers.servicers.editor import Editor
65from couchers.servicers.events import Events
66from couchers.servicers.galleries import Galleries
67from couchers.servicers.gis import GIS
68from couchers.servicers.groups import Groups
69from couchers.servicers.jail import Jail
70from couchers.servicers.media import Media, get_media_auth_interceptor
71from couchers.servicers.moderation import Moderation
72from couchers.servicers.notifications import Notifications
73from couchers.servicers.pages import Pages
74from couchers.servicers.postal_verification import PostalVerification
75from couchers.servicers.public import Public
76from couchers.servicers.public_trips import PublicTrips
77from couchers.servicers.references import References
78from couchers.servicers.reporting import Reporting
79from couchers.servicers.requests import Requests
80from couchers.servicers.resources import Resources
81from couchers.servicers.search import Search
82from couchers.servicers.threads import Threads
85class _MockCouchersContext:
86 @property
87 def headers(self):
88 return {}
91class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
92 """
93 Injects the right `cookie: couchers-sesh=...` header into the metadata
94 """
96 def __init__(self, token: str):
97 self.token = token
99 def __call__(self, context, callback) -> None:
100 callback((("cookie", f"couchers-sesh={self.token}"),), None)
103class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
104 def __init__(self):
105 self.latest_headers = {}
107 def intercept_unary_unary(self, continuation, client_call_details, request):
108 call = continuation(client_call_details, request)
109 self.latest_headers = dict(call.initial_metadata())
110 self.latest_header_raw = call.initial_metadata()
111 return call
114class FakeRpcError(grpc.RpcError):
115 def __init__(self, code: grpc.StatusCode, details: str):
116 self._code = code
117 self._details = details
119 def code(self) -> grpc.StatusCode:
120 return self._code
122 def details(self) -> str:
123 return self._details
126class MockGrpcContext:
127 """
128 Pure mock of grpc.ServicerContext for testing.
129 """
131 def __init__(self):
132 self._initial_metadata = []
133 self._invocation_metadata: list[tuple[str, str]] = []
135 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn:
136 raise FakeRpcError(code, details)
138 def invocation_metadata(self) -> list[tuple[str, str]]:
139 return self._invocation_metadata
141 def send_initial_metadata(self, metadata):
142 self._initial_metadata.extend(metadata)
145class FakeChannel:
146 """
147 Mock gRPC channel for testing that orchestrates context creation.
149 This holds the test state (token) and creates proper CouchersContext
150 instances when handlers are invoked.
151 """
153 def __init__(self, token: str | None = None):
154 self.handlers: dict[str, Any] = {}
155 self._token = token
156 self._pool = get_descriptor_pool()
158 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any):
159 _validate_generic_rpc_handlers(generic_rpc_handlers)
160 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
162 def unary_unary(self, method, request_serializer, response_deserializer):
163 handler = self.handlers[method]
165 def fake_handler(request):
166 auth_info = _try_get_and_update_user_details(
167 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
168 )
169 auth_level = find_auth_level(self._pool, method)
170 check_permissions(auth_info, auth_level)
172 # Do a full serialization cycle on the request and the
173 # response to catch accidental use of unserializable data.
174 request = handler.request_deserializer(request_serializer(request))
176 with session_scope() as session:
177 context = make_interactive_context(
178 grpc_context=MockGrpcContext(),
179 user_id=auth_info.user_id if auth_info else None,
180 is_api_key=False,
181 token=self._token if auth_info else None,
182 localization=LocalizationContext(
183 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE,
184 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"),
185 ),
186 )
188 response = handler.unary_unary(request, context, session)
190 return response_deserializer(handler.response_serializer(response))
192 return fake_handler
195@contextmanager
196def run_server(grpc_channel_options=(), token: str | None = None):
197 with futures.ThreadPoolExecutor(1) as executor:
198 if token:
199 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
200 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
201 else:
202 creds = grpc.local_channel_credentials()
204 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
205 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials())
206 srv.start()
208 try:
209 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel:
210 metadata_interceptor = MetadataKeeperInterceptor()
211 channel = grpc.intercept_channel(channel, metadata_interceptor)
212 yield srv, channel, metadata_interceptor
213 finally:
214 srv.stop(None).wait()
217# Sessions that start a real GRPC server.
218@contextmanager
219def auth_api_session(
220 grpc_channel_options=(),
221) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]:
222 """
223 Create an Auth API for testing
225 This needs to use the real server since it plays around with headers
226 """
227 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor):
228 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
229 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
232@contextmanager
233def real_api_session(token: str):
234 """
235 Create an API for testing, using TCP sockets, uses the token for auth
236 """
237 with run_server(token=token) as (server, channel, metadata_interceptor):
238 api_pb2_grpc.add_APIServicer_to_server(API(), server)
239 yield api_pb2_grpc.APIStub(channel)
242@contextmanager
243def real_admin_session(token: str):
244 """
245 Create an Admin service for testing, using TCP sockets, uses the token for auth
246 """
247 with run_server(token=token) as (server, channel, metadata_interceptor):
248 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
249 yield admin_pb2_grpc.AdminStub(channel)
252@contextmanager
253def real_editor_session(token: str):
254 """
255 Create an Editor service for testing, using TCP sockets, uses the token for auth
256 """
257 with run_server(token=token) as (server, channel, metadata_interceptor):
258 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
259 yield editor_pb2_grpc.EditorStub(channel)
262@contextmanager
263def real_moderation_session(token: str):
264 """
265 Create a Moderation service for testing, using TCP sockets, uses the token for auth
266 """
267 with run_server(token=token) as (server, channel, metadata_interceptor):
268 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
269 yield moderation_pb2_grpc.ModerationStub(channel)
272@contextmanager
273def real_account_session(token: str):
274 """
275 Create an Account service for testing, using TCP sockets, uses the token for auth
276 """
277 with run_server(token=token) as (server, channel, metadata_interceptor):
278 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
279 yield account_pb2_grpc.AccountStub(channel)
282@contextmanager
283def real_jail_session(token: str):
284 """
285 Create a Jail service for testing, using TCP sockets, uses the token for auth
286 """
287 with run_server(token=token) as (server, channel, metadata_interceptor):
288 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
289 yield jail_pb2_grpc.JailStub(channel)
292@contextmanager
293def real_stripe_session():
294 """
295 Create a Stripe service for testing, using TCP sockets
296 """
297 with run_server() as (server, channel, metadata_interceptor):
298 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
299 yield stripe_pb2_grpc.StripeStub(channel)
302@contextmanager
303def real_iris_session():
304 with run_server() as (server, channel, metadata_interceptor):
305 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
306 yield iris_pb2_grpc.IrisStub(channel)
309@contextmanager
310def media_session(bearer_token: str):
311 """
312 Create a fresh Media API for testing, uses the bearer token for media auth
313 """
314 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
316 with futures.ThreadPoolExecutor(1) as executor:
317 server = grpc.server(executor, interceptors=[media_auth_interceptor])
318 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
319 media_pb2_grpc.add_MediaServicer_to_server(Media(), server)
320 server.start()
322 call_creds = grpc.access_token_call_credentials(bearer_token)
323 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
325 try:
326 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
327 yield media_pb2_grpc.MediaStub(channel)
328 finally:
329 server.stop(None).wait()
332# Sessions that don't need to start a real GRPC server.
333# Note: these don't need to be context managers, but they are so that
334# we can switch to a real implementation if needed.
335@contextmanager
336def api_session(token: str):
337 """
338 Create an API for testing, uses the token for auth
339 """
340 channel = FakeChannel(token)
341 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
342 yield api_pb2_grpc.APIStub(channel)
345@contextmanager
346def gis_session(token: str):
347 channel = FakeChannel(token)
348 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
349 yield gis_pb2_grpc.GISStub(channel)
352@contextmanager
353def public_session():
354 channel = FakeChannel()
355 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
356 yield public_pb2_grpc.PublicStub(channel)
359@contextmanager
360def public_trips_session(token: str):
361 channel = FakeChannel(token)
362 public_trips_pb2_grpc.add_PublicTripsServicer_to_server(PublicTrips(), channel)
363 yield public_trips_pb2_grpc.PublicTripsStub(channel)
366@contextmanager
367def conversations_session(token: str):
368 """
369 Create a Conversations API for testing, uses the token for auth
370 """
371 channel = FakeChannel(token)
372 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
373 yield conversations_pb2_grpc.ConversationsStub(channel)
376@contextmanager
377def requests_session(token: str):
378 """
379 Create a Requests API for testing, uses the token for auth
380 """
381 channel = FakeChannel(token)
382 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
383 yield requests_pb2_grpc.RequestsStub(channel)
386@contextmanager
387def threads_session(token: str):
388 channel = FakeChannel(token)
389 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
390 yield threads_pb2_grpc.ThreadsStub(channel)
393@contextmanager
394def discussions_session(token: str):
395 channel = FakeChannel(token)
396 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
397 yield discussions_pb2_grpc.DiscussionsStub(channel)
400@contextmanager
401def donations_session(token: str):
402 channel = FakeChannel(token)
403 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
404 yield donations_pb2_grpc.DonationsStub(channel)
407@contextmanager
408def pages_session(token: str):
409 channel = FakeChannel(token)
410 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
411 yield pages_pb2_grpc.PagesStub(channel)
414@contextmanager
415def communities_session(token: str):
416 channel = FakeChannel(token)
417 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
418 yield communities_pb2_grpc.CommunitiesStub(channel)
421@contextmanager
422def groups_session(token: str):
423 channel = FakeChannel(token)
424 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
425 yield groups_pb2_grpc.GroupsStub(channel)
428@contextmanager
429def blocking_session(token: str):
430 channel = FakeChannel(token)
431 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
432 yield blocking_pb2_grpc.BlockingStub(channel)
435@contextmanager
436def notifications_session(token: str):
437 channel = FakeChannel(token)
438 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
439 yield notifications_pb2_grpc.NotificationsStub(channel)
442@contextmanager
443def account_session(token: str):
444 """
445 Create a Account API for testing, uses the token for auth
446 """
447 channel = FakeChannel(token)
448 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
449 yield account_pb2_grpc.AccountStub(channel)
452@contextmanager
453def search_session(token: str):
454 """
455 Create a Search API for testing, uses the token for auth
456 """
457 channel = FakeChannel(token)
458 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
459 yield search_pb2_grpc.SearchStub(channel)
462@contextmanager
463def references_session(token: str):
464 """
465 Create a References API for testing, uses the token for auth
466 """
467 channel = FakeChannel(token)
468 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
469 yield references_pb2_grpc.ReferencesStub(channel)
472@contextmanager
473def galleries_session(token: str):
474 """
475 Create a Galleries API for testing, uses the token for auth
476 """
477 channel = FakeChannel(token)
478 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
479 yield galleries_pb2_grpc.GalleriesStub(channel)
482@contextmanager
483def reporting_session(token: str):
484 channel = FakeChannel(token)
485 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
486 yield reporting_pb2_grpc.ReportingStub(channel)
489@contextmanager
490def events_session(token: str):
491 channel = FakeChannel(token)
492 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
493 yield events_pb2_grpc.EventsStub(channel)
496@contextmanager
497def postal_verification_session(token: str):
498 channel = FakeChannel(token)
499 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
500 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
503@contextmanager
504def bugs_session(token: str | None = None):
505 channel = FakeChannel(token)
506 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
507 yield bugs_pb2_grpc.BugsStub(channel)
510@contextmanager
511def resources_session():
512 channel = FakeChannel()
513 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
514 yield resources_pb2_grpc.ResourcesStub(channel)