Coverage for app / backend / src / tests / fixtures / sessions.py: 99%
282 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1from collections.abc import Generator
2from concurrent import futures
3from contextlib import contextmanager
4from typing import Any, NoReturn
5from zoneinfo import ZoneInfo
7import grpc
8from grpc._server import _validate_generic_rpc_handlers
10from couchers.context import make_interactive_context
11from couchers.db import session_scope
12from couchers.descriptor_pool import get_descriptor_pool
13from couchers.i18n import LocalizationContext
14from couchers.i18n.locales import DEFAULT_LOCALE
15from couchers.interceptors import (
16 CouchersMiddlewareInterceptor,
17 _try_get_and_update_user_details,
18 check_permissions,
19 find_auth_level,
20)
21from couchers.proto import (
22 account_pb2_grpc,
23 admin_pb2_grpc,
24 api_pb2_grpc,
25 auth_pb2_grpc,
26 blocking_pb2_grpc,
27 bugs_pb2_grpc,
28 communities_pb2_grpc,
29 conversations_pb2_grpc,
30 discussions_pb2_grpc,
31 donations_pb2_grpc,
32 editor_pb2_grpc,
33 events_pb2_grpc,
34 galleries_pb2_grpc,
35 gis_pb2_grpc,
36 groups_pb2_grpc,
37 iris_pb2_grpc,
38 jail_pb2_grpc,
39 media_pb2_grpc,
40 moderation_pb2_grpc,
41 notifications_pb2_grpc,
42 pages_pb2_grpc,
43 postal_verification_pb2_grpc,
44 public_pb2_grpc,
45 references_pb2_grpc,
46 reporting_pb2_grpc,
47 requests_pb2_grpc,
48 resources_pb2_grpc,
49 search_pb2_grpc,
50 stripe_pb2_grpc,
51 threads_pb2_grpc,
52)
53from couchers.servicers.account import Account, Iris
54from couchers.servicers.admin import Admin
55from couchers.servicers.api import API
56from couchers.servicers.auth import Auth
57from couchers.servicers.blocking import Blocking
58from couchers.servicers.bugs import Bugs
59from couchers.servicers.communities import Communities
60from couchers.servicers.conversations import Conversations
61from couchers.servicers.discussions import Discussions
62from couchers.servicers.donations import Donations, Stripe
63from couchers.servicers.editor import Editor
64from couchers.servicers.events import Events
65from couchers.servicers.galleries import Galleries
66from couchers.servicers.gis import GIS
67from couchers.servicers.groups import Groups
68from couchers.servicers.jail import Jail
69from couchers.servicers.media import Media, get_media_auth_interceptor
70from couchers.servicers.moderation import Moderation
71from couchers.servicers.notifications import Notifications
72from couchers.servicers.pages import Pages
73from couchers.servicers.postal_verification import PostalVerification
74from couchers.servicers.public import Public
75from couchers.servicers.references import References
76from couchers.servicers.reporting import Reporting
77from couchers.servicers.requests import Requests
78from couchers.servicers.resources import Resources
79from couchers.servicers.search import Search
80from couchers.servicers.threads import Threads
83class _MockCouchersContext:
84 @property
85 def headers(self):
86 return {}
89class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
90 """
91 Injects the right `cookie: couchers-sesh=...` header into the metadata
92 """
94 def __init__(self, token: str):
95 self.token = token
97 def __call__(self, context, callback) -> None:
98 callback((("cookie", f"couchers-sesh={self.token}"),), None)
101class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
102 def __init__(self):
103 self.latest_headers = {}
105 def intercept_unary_unary(self, continuation, client_call_details, request):
106 call = continuation(client_call_details, request)
107 self.latest_headers = dict(call.initial_metadata())
108 self.latest_header_raw = call.initial_metadata()
109 return call
112class FakeRpcError(grpc.RpcError):
113 def __init__(self, code: grpc.StatusCode, details: str):
114 self._code = code
115 self._details = details
117 def code(self) -> grpc.StatusCode:
118 return self._code
120 def details(self) -> str:
121 return self._details
124class MockGrpcContext:
125 """
126 Pure mock of grpc.ServicerContext for testing.
127 """
129 def __init__(self):
130 self._initial_metadata = []
131 self._invocation_metadata: list[tuple[str, str]] = []
133 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn:
134 raise FakeRpcError(code, details)
136 def invocation_metadata(self) -> list[tuple[str, str]]:
137 return self._invocation_metadata
139 def send_initial_metadata(self, metadata):
140 self._initial_metadata.extend(metadata)
143class FakeChannel:
144 """
145 Mock gRPC channel for testing that orchestrates context creation.
147 This holds the test state (token) and creates proper CouchersContext
148 instances when handlers are invoked.
149 """
151 def __init__(self, token: str | None = None):
152 self.handlers: dict[str, Any] = {}
153 self._token = token
154 self._pool = get_descriptor_pool()
156 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any):
157 _validate_generic_rpc_handlers(generic_rpc_handlers)
158 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
160 def unary_unary(self, method, request_serializer, response_deserializer):
161 handler = self.handlers[method]
163 def fake_handler(request):
164 auth_info = _try_get_and_update_user_details(
165 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
166 )
167 auth_level = find_auth_level(self._pool, method)
168 check_permissions(auth_info, auth_level)
170 # Do a full serialization cycle on the request and the
171 # response to catch accidental use of unserializable data.
172 request = handler.request_deserializer(request_serializer(request))
174 with session_scope() as session:
175 context = make_interactive_context(
176 grpc_context=MockGrpcContext(),
177 user_id=auth_info.user_id if auth_info else None,
178 is_api_key=False,
179 token=self._token if auth_info else None,
180 localization=LocalizationContext(
181 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE,
182 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"),
183 ),
184 )
186 response = handler.unary_unary(request, context, session)
188 return response_deserializer(handler.response_serializer(response))
190 return fake_handler
193@contextmanager
194def run_server(grpc_channel_options=(), token: str | None = None):
195 with futures.ThreadPoolExecutor(1) as executor:
196 if token:
197 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
198 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
199 else:
200 creds = grpc.local_channel_credentials()
202 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
203 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials())
204 srv.start()
206 try:
207 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel:
208 metadata_interceptor = MetadataKeeperInterceptor()
209 channel = grpc.intercept_channel(channel, metadata_interceptor)
210 yield srv, channel, metadata_interceptor
211 finally:
212 srv.stop(None).wait()
215# Sessions that start a real GRPC server.
216@contextmanager
217def auth_api_session(
218 grpc_channel_options=(),
219) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]:
220 """
221 Create an Auth API for testing
223 This needs to use the real server since it plays around with headers
224 """
225 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor):
226 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
227 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
230@contextmanager
231def real_api_session(token: str):
232 """
233 Create an API for testing, using TCP sockets, uses the token for auth
234 """
235 with run_server(token=token) as (server, channel, metadata_interceptor):
236 api_pb2_grpc.add_APIServicer_to_server(API(), server)
237 yield api_pb2_grpc.APIStub(channel)
240@contextmanager
241def real_admin_session(token: str):
242 """
243 Create an Admin service for testing, using TCP sockets, uses the token for auth
244 """
245 with run_server(token=token) as (server, channel, metadata_interceptor):
246 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
247 yield admin_pb2_grpc.AdminStub(channel)
250@contextmanager
251def real_editor_session(token: str):
252 """
253 Create an Editor service for testing, using TCP sockets, uses the token for auth
254 """
255 with run_server(token=token) as (server, channel, metadata_interceptor):
256 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
257 yield editor_pb2_grpc.EditorStub(channel)
260@contextmanager
261def real_moderation_session(token: str):
262 """
263 Create a Moderation service for testing, using TCP sockets, uses the token for auth
264 """
265 with run_server(token=token) as (server, channel, metadata_interceptor):
266 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
267 yield moderation_pb2_grpc.ModerationStub(channel)
270@contextmanager
271def real_account_session(token: str):
272 """
273 Create an Account service for testing, using TCP sockets, uses the token for auth
274 """
275 with run_server(token=token) as (server, channel, metadata_interceptor):
276 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
277 yield account_pb2_grpc.AccountStub(channel)
280@contextmanager
281def real_jail_session(token: str):
282 """
283 Create a Jail service for testing, using TCP sockets, uses the token for auth
284 """
285 with run_server(token=token) as (server, channel, metadata_interceptor):
286 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
287 yield jail_pb2_grpc.JailStub(channel)
290@contextmanager
291def real_stripe_session():
292 """
293 Create a Stripe service for testing, using TCP sockets
294 """
295 with run_server() as (server, channel, metadata_interceptor):
296 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
297 yield stripe_pb2_grpc.StripeStub(channel)
300@contextmanager
301def real_iris_session():
302 with run_server() as (server, channel, metadata_interceptor):
303 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
304 yield iris_pb2_grpc.IrisStub(channel)
307@contextmanager
308def media_session(bearer_token: str):
309 """
310 Create a fresh Media API for testing, uses the bearer token for media auth
311 """
312 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
314 with futures.ThreadPoolExecutor(1) as executor:
315 server = grpc.server(executor, interceptors=[media_auth_interceptor])
316 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
317 media_pb2_grpc.add_MediaServicer_to_server(Media(), server)
318 server.start()
320 call_creds = grpc.access_token_call_credentials(bearer_token)
321 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
323 try:
324 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
325 yield media_pb2_grpc.MediaStub(channel)
326 finally:
327 server.stop(None).wait()
330# Sessions that don't need to start a real GRPC server.
331# Note: these don't need to be context managers, but they are so that
332# we can switch to a real implementation if needed.
333@contextmanager
334def api_session(token: str):
335 """
336 Create an API for testing, uses the token for auth
337 """
338 channel = FakeChannel(token)
339 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
340 yield api_pb2_grpc.APIStub(channel)
343@contextmanager
344def gis_session(token: str):
345 channel = FakeChannel(token)
346 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
347 yield gis_pb2_grpc.GISStub(channel)
350@contextmanager
351def public_session():
352 channel = FakeChannel()
353 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
354 yield public_pb2_grpc.PublicStub(channel)
357@contextmanager
358def conversations_session(token: str):
359 """
360 Create a Conversations API for testing, uses the token for auth
361 """
362 channel = FakeChannel(token)
363 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
364 yield conversations_pb2_grpc.ConversationsStub(channel)
367@contextmanager
368def requests_session(token: str):
369 """
370 Create a Requests API for testing, uses the token for auth
371 """
372 channel = FakeChannel(token)
373 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
374 yield requests_pb2_grpc.RequestsStub(channel)
377@contextmanager
378def threads_session(token: str):
379 channel = FakeChannel(token)
380 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
381 yield threads_pb2_grpc.ThreadsStub(channel)
384@contextmanager
385def discussions_session(token: str):
386 channel = FakeChannel(token)
387 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
388 yield discussions_pb2_grpc.DiscussionsStub(channel)
391@contextmanager
392def donations_session(token: str):
393 channel = FakeChannel(token)
394 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
395 yield donations_pb2_grpc.DonationsStub(channel)
398@contextmanager
399def pages_session(token: str):
400 channel = FakeChannel(token)
401 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
402 yield pages_pb2_grpc.PagesStub(channel)
405@contextmanager
406def communities_session(token: str):
407 channel = FakeChannel(token)
408 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
409 yield communities_pb2_grpc.CommunitiesStub(channel)
412@contextmanager
413def groups_session(token: str):
414 channel = FakeChannel(token)
415 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
416 yield groups_pb2_grpc.GroupsStub(channel)
419@contextmanager
420def blocking_session(token: str):
421 channel = FakeChannel(token)
422 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
423 yield blocking_pb2_grpc.BlockingStub(channel)
426@contextmanager
427def notifications_session(token: str):
428 channel = FakeChannel(token)
429 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
430 yield notifications_pb2_grpc.NotificationsStub(channel)
433@contextmanager
434def account_session(token: str):
435 """
436 Create a Account API for testing, uses the token for auth
437 """
438 channel = FakeChannel(token)
439 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
440 yield account_pb2_grpc.AccountStub(channel)
443@contextmanager
444def search_session(token: str):
445 """
446 Create a Search API for testing, uses the token for auth
447 """
448 channel = FakeChannel(token)
449 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
450 yield search_pb2_grpc.SearchStub(channel)
453@contextmanager
454def references_session(token: str):
455 """
456 Create a References API for testing, uses the token for auth
457 """
458 channel = FakeChannel(token)
459 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
460 yield references_pb2_grpc.ReferencesStub(channel)
463@contextmanager
464def galleries_session(token: str):
465 """
466 Create a Galleries API for testing, uses the token for auth
467 """
468 channel = FakeChannel(token)
469 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
470 yield galleries_pb2_grpc.GalleriesStub(channel)
473@contextmanager
474def reporting_session(token: str):
475 channel = FakeChannel(token)
476 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
477 yield reporting_pb2_grpc.ReportingStub(channel)
480@contextmanager
481def events_session(token: str):
482 channel = FakeChannel(token)
483 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
484 yield events_pb2_grpc.EventsStub(channel)
487@contextmanager
488def postal_verification_session(token: str):
489 channel = FakeChannel(token)
490 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
491 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
494@contextmanager
495def bugs_session(token: str | None = None):
496 channel = FakeChannel(token)
497 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
498 yield bugs_pb2_grpc.BugsStub(channel)
501@contextmanager
502def resources_session():
503 channel = FakeChannel()
504 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
505 yield resources_pb2_grpc.ResourcesStub(channel)