Coverage for app/backend/src/tests/fixtures/sessions.py: 99%
295 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1from collections.abc import Generator
2from concurrent import futures
3from contextlib import contextmanager
4from typing import Any, NoReturn
5from zoneinfo import ZoneInfo
7import grpc
8from grpc._server import _validate_generic_rpc_handlers
10from couchers.context import make_interactive_context
11from couchers.db import session_scope
12from couchers.descriptor_pool import get_descriptor_pool
13from couchers.i18n import LocalizationContext
14from couchers.i18n.locales import DEFAULT_LOCALE
15from couchers.interceptors import (
16 CouchersMiddlewareInterceptor,
17 _try_get_and_update_user_details,
18 check_permissions,
19 find_auth_level,
20)
21from couchers.proto import (
22 account_pb2_grpc,
23 admin_pb2_grpc,
24 api_pb2_grpc,
25 auth_pb2_grpc,
26 blocking_pb2_grpc,
27 bugs_pb2_grpc,
28 communities_pb2_grpc,
29 conversations_pb2_grpc,
30 discussions_pb2_grpc,
31 donations_pb2_grpc,
32 editor_pb2_grpc,
33 events_pb2_grpc,
34 galleries_pb2_grpc,
35 gis_pb2_grpc,
36 groups_pb2_grpc,
37 iris_pb2_grpc,
38 jail_pb2_grpc,
39 media_pb2_grpc,
40 moderation_pb2_grpc,
41 notifications_pb2_grpc,
42 pages_pb2_grpc,
43 postal_verification_pb2_grpc,
44 public_pb2_grpc,
45 public_trips_pb2_grpc,
46 references_pb2_grpc,
47 reporting_pb2_grpc,
48 requests_pb2_grpc,
49 resources_pb2_grpc,
50 search_pb2_grpc,
51 stripe_pb2_grpc,
52 threads_pb2_grpc,
53)
54from couchers.servicers.account import Account, Iris
55from couchers.servicers.admin import Admin
56from couchers.servicers.api import API
57from couchers.servicers.auth import Auth
58from couchers.servicers.blocking import Blocking
59from couchers.servicers.bugs import Bugs
60from couchers.servicers.communities import Communities
61from couchers.servicers.conversations import Conversations
62from couchers.servicers.discussions import Discussions
63from couchers.servicers.donations import Donations, Stripe
64from couchers.servicers.editor import Editor
65from couchers.servicers.events import Events
66from couchers.servicers.galleries import Galleries
67from couchers.servicers.gis import GIS
68from couchers.servicers.groups import Groups
69from couchers.servicers.jail import Jail
70from couchers.servicers.media import Media, get_media_auth_interceptor
71from couchers.servicers.moderation import Moderation
72from couchers.servicers.notifications import Notifications
73from couchers.servicers.pages import Pages
74from couchers.servicers.postal_verification import PostalVerification
75from couchers.servicers.public import Public
76from couchers.servicers.public_trips import PublicTrips
77from couchers.servicers.references import References
78from couchers.servicers.reporting import Reporting
79from couchers.servicers.requests import Requests
80from couchers.servicers.resources import Resources
81from couchers.servicers.search import Search
82from couchers.servicers.threads import Threads
85class _MockCouchersContext:
86 @property
87 def headers(self):
88 return {}
90 def get_header(self, name):
91 return None
94class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
95 """
96 Injects the right `cookie: couchers-sesh=...` header into the metadata
97 """
99 def __init__(self, token: str):
100 self.token = token
102 def __call__(self, context, callback) -> None:
103 callback((("cookie", f"couchers-sesh={self.token}"),), None)
106class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
107 def __init__(self):
108 self.latest_headers = {}
110 def intercept_unary_unary(self, continuation, client_call_details, request):
111 call = continuation(client_call_details, request)
112 self.latest_headers = dict(call.initial_metadata())
113 self.latest_header_raw = call.initial_metadata()
114 return call
117class FakeRpcError(grpc.RpcError):
118 def __init__(self, code: grpc.StatusCode, details: str):
119 self._code = code
120 self._details = details
122 def code(self) -> grpc.StatusCode:
123 return self._code
125 def details(self) -> str:
126 return self._details
129class MockGrpcContext:
130 """
131 Pure mock of grpc.ServicerContext for testing.
132 """
134 def __init__(self):
135 self._initial_metadata = []
136 self._invocation_metadata: list[tuple[str, str]] = []
138 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn:
139 raise FakeRpcError(code, details)
141 def invocation_metadata(self) -> list[tuple[str, str]]:
142 return self._invocation_metadata
144 def send_initial_metadata(self, metadata):
145 self._initial_metadata.extend(metadata)
148class FakeChannel:
149 """
150 Mock gRPC channel for testing that orchestrates context creation.
152 This holds the test state (token) and creates proper CouchersContext
153 instances when handlers are invoked.
154 """
156 def __init__(self, token: str | None = None):
157 self.handlers: dict[str, Any] = {}
158 self._token = token
159 self._pool = get_descriptor_pool()
161 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any):
162 _validate_generic_rpc_handlers(generic_rpc_handlers)
163 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
165 def unary_unary(self, method, request_serializer, response_deserializer):
166 handler = self.handlers[method]
168 def fake_handler(request):
169 auth_info = _try_get_and_update_user_details(
170 self._token,
171 is_api_key=False,
172 ip_address="127.0.0.1",
173 user_agent="Testing User-Agent",
174 sofa=None,
175 client_platform=None,
176 )
177 auth_level = find_auth_level(self._pool, method)
178 check_permissions(auth_info, auth_level)
180 # Do a full serialization cycle on the request and the
181 # response to catch accidental use of unserializable data.
182 request = handler.request_deserializer(request_serializer(request))
184 with session_scope() as session:
185 context = make_interactive_context(
186 grpc_context=MockGrpcContext(),
187 user_id=auth_info.user_id if auth_info else None,
188 is_api_key=False,
189 token=self._token if auth_info else None,
190 localization=LocalizationContext(
191 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE,
192 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"),
193 ),
194 sofa="test_sofa_cookie_value",
195 )
197 response = handler.unary_unary(request, context, session)
199 return response_deserializer(handler.response_serializer(response))
201 return fake_handler
204@contextmanager
205def run_server(grpc_channel_options=(), token: str | None = None):
206 with futures.ThreadPoolExecutor(1) as executor:
207 if token:
208 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
209 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
210 else:
211 creds = grpc.local_channel_credentials()
213 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
214 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials())
215 srv.start()
217 try:
218 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel:
219 metadata_interceptor = MetadataKeeperInterceptor()
220 channel = grpc.intercept_channel(channel, metadata_interceptor)
221 yield srv, channel, metadata_interceptor
222 finally:
223 srv.stop(None).wait()
226# Sessions that start a real GRPC server.
227@contextmanager
228def auth_api_session(
229 grpc_channel_options=(),
230) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]:
231 """
232 Create an Auth API for testing
234 This needs to use the real server since it plays around with headers
235 """
236 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor):
237 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
238 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
241@contextmanager
242def real_api_session(token: str):
243 """
244 Create an API for testing, using TCP sockets, uses the token for auth
245 """
246 with run_server(token=token) as (server, channel, metadata_interceptor):
247 api_pb2_grpc.add_APIServicer_to_server(API(), server)
248 yield api_pb2_grpc.APIStub(channel)
251@contextmanager
252def real_admin_session(token: str):
253 """
254 Create an Admin service for testing, using TCP sockets, uses the token for auth
255 """
256 with run_server(token=token) as (server, channel, metadata_interceptor):
257 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
258 yield admin_pb2_grpc.AdminStub(channel)
261@contextmanager
262def real_editor_session(token: str):
263 """
264 Create an Editor service for testing, using TCP sockets, uses the token for auth
265 """
266 with run_server(token=token) as (server, channel, metadata_interceptor):
267 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
268 yield editor_pb2_grpc.EditorStub(channel)
271@contextmanager
272def real_moderation_session(token: str):
273 """
274 Create a Moderation service for testing, using TCP sockets, uses the token for auth
275 """
276 with run_server(token=token) as (server, channel, metadata_interceptor):
277 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
278 yield moderation_pb2_grpc.ModerationStub(channel)
281@contextmanager
282def real_account_session(token: str):
283 """
284 Create an Account service for testing, using TCP sockets, uses the token for auth
285 """
286 with run_server(token=token) as (server, channel, metadata_interceptor):
287 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
288 yield account_pb2_grpc.AccountStub(channel)
291@contextmanager
292def real_jail_session(token: str):
293 """
294 Create a Jail service for testing, using TCP sockets, uses the token for auth
295 """
296 with run_server(token=token) as (server, channel, metadata_interceptor):
297 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
298 yield jail_pb2_grpc.JailStub(channel)
301@contextmanager
302def real_stripe_session():
303 """
304 Create a Stripe service for testing, using TCP sockets
305 """
306 with run_server() as (server, channel, metadata_interceptor):
307 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
308 yield stripe_pb2_grpc.StripeStub(channel)
311@contextmanager
312def real_iris_session():
313 with run_server() as (server, channel, metadata_interceptor):
314 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
315 yield iris_pb2_grpc.IrisStub(channel)
318@contextmanager
319def real_bugs_session():
320 """
321 Bugs over a real server so requests can carry metadata (HTTP request headers)
322 and the response's initial metadata (HTTP response headers) can be asserted.
323 """
324 with run_server() as (server, channel, metadata_interceptor):
325 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), server)
326 yield bugs_pb2_grpc.BugsStub(channel), metadata_interceptor
329@contextmanager
330def media_session(bearer_token: str):
331 """
332 Create a fresh Media API for testing, uses the bearer token for media auth
333 """
334 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
336 with futures.ThreadPoolExecutor(1) as executor:
337 server = grpc.server(executor, interceptors=[media_auth_interceptor])
338 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
339 media_pb2_grpc.add_MediaServicer_to_server(Media(), server)
340 server.start()
342 call_creds = grpc.access_token_call_credentials(bearer_token)
343 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
345 try:
346 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
347 yield media_pb2_grpc.MediaStub(channel)
348 finally:
349 server.stop(None).wait()
352# Sessions that don't need to start a real GRPC server.
353# Note: these don't need to be context managers, but they are so that
354# we can switch to a real implementation if needed.
355@contextmanager
356def api_session(token: str):
357 """
358 Create an API for testing, uses the token for auth
359 """
360 channel = FakeChannel(token)
361 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
362 yield api_pb2_grpc.APIStub(channel)
365@contextmanager
366def gis_session(token: str):
367 channel = FakeChannel(token)
368 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
369 yield gis_pb2_grpc.GISStub(channel)
372@contextmanager
373def public_session():
374 channel = FakeChannel()
375 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
376 yield public_pb2_grpc.PublicStub(channel)
379@contextmanager
380def public_trips_session(token: str):
381 channel = FakeChannel(token)
382 public_trips_pb2_grpc.add_PublicTripsServicer_to_server(PublicTrips(), channel)
383 yield public_trips_pb2_grpc.PublicTripsStub(channel)
386@contextmanager
387def conversations_session(token: str):
388 """
389 Create a Conversations API for testing, uses the token for auth
390 """
391 channel = FakeChannel(token)
392 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
393 yield conversations_pb2_grpc.ConversationsStub(channel)
396@contextmanager
397def requests_session(token: str):
398 """
399 Create a Requests API for testing, uses the token for auth
400 """
401 channel = FakeChannel(token)
402 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
403 yield requests_pb2_grpc.RequestsStub(channel)
406@contextmanager
407def threads_session(token: str):
408 channel = FakeChannel(token)
409 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
410 yield threads_pb2_grpc.ThreadsStub(channel)
413@contextmanager
414def discussions_session(token: str):
415 channel = FakeChannel(token)
416 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
417 yield discussions_pb2_grpc.DiscussionsStub(channel)
420@contextmanager
421def donations_session(token: str):
422 channel = FakeChannel(token)
423 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
424 yield donations_pb2_grpc.DonationsStub(channel)
427@contextmanager
428def pages_session(token: str):
429 channel = FakeChannel(token)
430 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
431 yield pages_pb2_grpc.PagesStub(channel)
434@contextmanager
435def communities_session(token: str):
436 channel = FakeChannel(token)
437 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
438 yield communities_pb2_grpc.CommunitiesStub(channel)
441@contextmanager
442def groups_session(token: str):
443 channel = FakeChannel(token)
444 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
445 yield groups_pb2_grpc.GroupsStub(channel)
448@contextmanager
449def blocking_session(token: str):
450 channel = FakeChannel(token)
451 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
452 yield blocking_pb2_grpc.BlockingStub(channel)
455@contextmanager
456def notifications_session(token: str):
457 channel = FakeChannel(token)
458 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
459 yield notifications_pb2_grpc.NotificationsStub(channel)
462@contextmanager
463def account_session(token: str):
464 """
465 Create a Account API for testing, uses the token for auth
466 """
467 channel = FakeChannel(token)
468 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
469 yield account_pb2_grpc.AccountStub(channel)
472@contextmanager
473def search_session(token: str):
474 """
475 Create a Search API for testing, uses the token for auth
476 """
477 channel = FakeChannel(token)
478 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
479 yield search_pb2_grpc.SearchStub(channel)
482@contextmanager
483def references_session(token: str):
484 """
485 Create a References API for testing, uses the token for auth
486 """
487 channel = FakeChannel(token)
488 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
489 yield references_pb2_grpc.ReferencesStub(channel)
492@contextmanager
493def galleries_session(token: str):
494 """
495 Create a Galleries API for testing, uses the token for auth
496 """
497 channel = FakeChannel(token)
498 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
499 yield galleries_pb2_grpc.GalleriesStub(channel)
502@contextmanager
503def reporting_session(token: str):
504 channel = FakeChannel(token)
505 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
506 yield reporting_pb2_grpc.ReportingStub(channel)
509@contextmanager
510def events_session(token: str):
511 channel = FakeChannel(token)
512 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
513 yield events_pb2_grpc.EventsStub(channel)
516@contextmanager
517def postal_verification_session(token: str):
518 channel = FakeChannel(token)
519 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
520 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
523@contextmanager
524def bugs_session(token: str | None = None):
525 channel = FakeChannel(token)
526 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
527 yield bugs_pb2_grpc.BugsStub(channel)
530@contextmanager
531def resources_session():
532 channel = FakeChannel()
533 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
534 yield resources_pb2_grpc.ResourcesStub(channel)