Coverage for app / backend / src / tests / fixtures / sessions.py: 99%
279 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1from collections.abc import Generator
2from concurrent import futures
3from contextlib import contextmanager
4from typing import Any, NoReturn
6import grpc
7from grpc._server import _validate_generic_rpc_handlers
9from couchers.context import make_interactive_context
10from couchers.db import session_scope
11from couchers.descriptor_pool import get_descriptor_pool
12from couchers.interceptors import (
13 CouchersMiddlewareInterceptor,
14 _try_get_and_update_user_details,
15 check_permissions,
16 find_auth_level,
17)
18from couchers.proto import (
19 account_pb2_grpc,
20 admin_pb2_grpc,
21 api_pb2_grpc,
22 auth_pb2_grpc,
23 blocking_pb2_grpc,
24 bugs_pb2_grpc,
25 communities_pb2_grpc,
26 conversations_pb2_grpc,
27 discussions_pb2_grpc,
28 donations_pb2_grpc,
29 editor_pb2_grpc,
30 events_pb2_grpc,
31 galleries_pb2_grpc,
32 gis_pb2_grpc,
33 groups_pb2_grpc,
34 iris_pb2_grpc,
35 jail_pb2_grpc,
36 media_pb2_grpc,
37 moderation_pb2_grpc,
38 notifications_pb2_grpc,
39 pages_pb2_grpc,
40 postal_verification_pb2_grpc,
41 public_pb2_grpc,
42 references_pb2_grpc,
43 reporting_pb2_grpc,
44 requests_pb2_grpc,
45 resources_pb2_grpc,
46 search_pb2_grpc,
47 stripe_pb2_grpc,
48 threads_pb2_grpc,
49)
50from couchers.servicers.account import Account, Iris
51from couchers.servicers.admin import Admin
52from couchers.servicers.api import API
53from couchers.servicers.auth import Auth
54from couchers.servicers.blocking import Blocking
55from couchers.servicers.bugs import Bugs
56from couchers.servicers.communities import Communities
57from couchers.servicers.conversations import Conversations
58from couchers.servicers.discussions import Discussions
59from couchers.servicers.donations import Donations, Stripe
60from couchers.servicers.editor import Editor
61from couchers.servicers.events import Events
62from couchers.servicers.galleries import Galleries
63from couchers.servicers.gis import GIS
64from couchers.servicers.groups import Groups
65from couchers.servicers.jail import Jail
66from couchers.servicers.media import Media, get_media_auth_interceptor
67from couchers.servicers.moderation import Moderation
68from couchers.servicers.notifications import Notifications
69from couchers.servicers.pages import Pages
70from couchers.servicers.postal_verification import PostalVerification
71from couchers.servicers.public import Public
72from couchers.servicers.references import References
73from couchers.servicers.reporting import Reporting
74from couchers.servicers.requests import Requests
75from couchers.servicers.resources import Resources
76from couchers.servicers.search import Search
77from couchers.servicers.threads import Threads
80class _MockCouchersContext:
81 @property
82 def headers(self):
83 return {}
86class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
87 """
88 Injects the right `cookie: couchers-sesh=...` header into the metadata
89 """
91 def __init__(self, token: str):
92 self.token = token
94 def __call__(self, context, callback) -> None:
95 callback((("cookie", f"couchers-sesh={self.token}"),), None)
98class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
99 def __init__(self):
100 self.latest_headers = {}
102 def intercept_unary_unary(self, continuation, client_call_details, request):
103 call = continuation(client_call_details, request)
104 self.latest_headers = dict(call.initial_metadata())
105 self.latest_header_raw = call.initial_metadata()
106 return call
109class FakeRpcError(grpc.RpcError):
110 def __init__(self, code: grpc.StatusCode, details: str):
111 self._code = code
112 self._details = details
114 def code(self) -> grpc.StatusCode:
115 return self._code
117 def details(self) -> str:
118 return self._details
121class MockGrpcContext:
122 """
123 Pure mock of grpc.ServicerContext for testing.
124 """
126 def __init__(self):
127 self._initial_metadata = []
128 self._invocation_metadata: list[tuple[str, str]] = []
130 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn:
131 raise FakeRpcError(code, details)
133 def invocation_metadata(self) -> list[tuple[str, str]]:
134 return self._invocation_metadata
136 def send_initial_metadata(self, metadata):
137 self._initial_metadata.extend(metadata)
140class FakeChannel:
141 """
142 Mock gRPC channel for testing that orchestrates context creation.
144 This holds the test state (token) and creates proper CouchersContext
145 instances when handlers are invoked.
146 """
148 def __init__(self, token: str | None = None):
149 self.handlers: dict[str, Any] = {}
150 self._token = token
151 self._pool = get_descriptor_pool()
153 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any):
154 _validate_generic_rpc_handlers(generic_rpc_handlers)
155 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
157 def unary_unary(self, method, request_serializer, response_deserializer):
158 handler = self.handlers[method]
160 def fake_handler(request):
161 auth_info = _try_get_and_update_user_details(
162 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
163 )
164 auth_level = find_auth_level(self._pool, method)
165 check_permissions(auth_info, auth_level)
167 # Do a full serialization cycle on the request and the
168 # response to catch accidental use of unserializable data.
169 request = handler.request_deserializer(request_serializer(request))
171 with session_scope() as session:
172 context = make_interactive_context(
173 grpc_context=MockGrpcContext(),
174 user_id=auth_info.user_id if auth_info else None,
175 is_api_key=False,
176 token=self._token if auth_info else None,
177 ui_language_preference=auth_info.ui_language_preference if auth_info else None,
178 )
180 response = handler.unary_unary(request, context, session)
182 return response_deserializer(handler.response_serializer(response))
184 return fake_handler
187@contextmanager
188def run_server(grpc_channel_options=(), token: str | None = None):
189 with futures.ThreadPoolExecutor(1) as executor:
190 if token:
191 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
192 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
193 else:
194 creds = grpc.local_channel_credentials()
196 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
197 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials())
198 srv.start()
200 try:
201 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel:
202 metadata_interceptor = MetadataKeeperInterceptor()
203 channel = grpc.intercept_channel(channel, metadata_interceptor)
204 yield srv, channel, metadata_interceptor
205 finally:
206 srv.stop(None).wait()
209# Sessions that start a real GRPC server.
210@contextmanager
211def auth_api_session(
212 grpc_channel_options=(),
213) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]:
214 """
215 Create an Auth API for testing
217 This needs to use the real server since it plays around with headers
218 """
219 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor):
220 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
221 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
224@contextmanager
225def real_api_session(token: str):
226 """
227 Create an API for testing, using TCP sockets, uses the token for auth
228 """
229 with run_server(token=token) as (server, channel, metadata_interceptor):
230 api_pb2_grpc.add_APIServicer_to_server(API(), server)
231 yield api_pb2_grpc.APIStub(channel)
234@contextmanager
235def real_admin_session(token: str):
236 """
237 Create an Admin service for testing, using TCP sockets, uses the token for auth
238 """
239 with run_server(token=token) as (server, channel, metadata_interceptor):
240 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
241 yield admin_pb2_grpc.AdminStub(channel)
244@contextmanager
245def real_editor_session(token: str):
246 """
247 Create an Editor service for testing, using TCP sockets, uses the token for auth
248 """
249 with run_server(token=token) as (server, channel, metadata_interceptor):
250 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
251 yield editor_pb2_grpc.EditorStub(channel)
254@contextmanager
255def real_moderation_session(token: str):
256 """
257 Create a Moderation service for testing, using TCP sockets, uses the token for auth
258 """
259 with run_server(token=token) as (server, channel, metadata_interceptor):
260 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
261 yield moderation_pb2_grpc.ModerationStub(channel)
264@contextmanager
265def real_account_session(token: str):
266 """
267 Create an Account service for testing, using TCP sockets, uses the token for auth
268 """
269 with run_server(token=token) as (server, channel, metadata_interceptor):
270 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
271 yield account_pb2_grpc.AccountStub(channel)
274@contextmanager
275def real_jail_session(token: str):
276 """
277 Create a Jail service for testing, using TCP sockets, uses the token for auth
278 """
279 with run_server(token=token) as (server, channel, metadata_interceptor):
280 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
281 yield jail_pb2_grpc.JailStub(channel)
284@contextmanager
285def real_stripe_session():
286 """
287 Create a Stripe service for testing, using TCP sockets
288 """
289 with run_server() as (server, channel, metadata_interceptor):
290 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
291 yield stripe_pb2_grpc.StripeStub(channel)
294@contextmanager
295def real_iris_session():
296 with run_server() as (server, channel, metadata_interceptor):
297 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
298 yield iris_pb2_grpc.IrisStub(channel)
301@contextmanager
302def media_session(bearer_token: str):
303 """
304 Create a fresh Media API for testing, uses the bearer token for media auth
305 """
306 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
308 with futures.ThreadPoolExecutor(1) as executor:
309 server = grpc.server(executor, interceptors=[media_auth_interceptor])
310 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
311 media_pb2_grpc.add_MediaServicer_to_server(Media(), server)
312 server.start()
314 call_creds = grpc.access_token_call_credentials(bearer_token)
315 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
317 try:
318 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
319 yield media_pb2_grpc.MediaStub(channel)
320 finally:
321 server.stop(None).wait()
324# Sessions that don't need to start a real GRPC server.
325# Note: these don't need to be context managers, but they are so that
326# we can switch to a real implementation if needed.
327@contextmanager
328def api_session(token: str):
329 """
330 Create an API for testing, uses the token for auth
331 """
332 channel = FakeChannel(token)
333 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
334 yield api_pb2_grpc.APIStub(channel)
337@contextmanager
338def gis_session(token: str):
339 channel = FakeChannel(token)
340 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
341 yield gis_pb2_grpc.GISStub(channel)
344@contextmanager
345def public_session():
346 channel = FakeChannel()
347 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
348 yield public_pb2_grpc.PublicStub(channel)
351@contextmanager
352def conversations_session(token: str):
353 """
354 Create a Conversations API for testing, uses the token for auth
355 """
356 channel = FakeChannel(token)
357 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
358 yield conversations_pb2_grpc.ConversationsStub(channel)
361@contextmanager
362def requests_session(token: str):
363 """
364 Create a Requests API for testing, uses the token for auth
365 """
366 channel = FakeChannel(token)
367 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
368 yield requests_pb2_grpc.RequestsStub(channel)
371@contextmanager
372def threads_session(token: str):
373 channel = FakeChannel(token)
374 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
375 yield threads_pb2_grpc.ThreadsStub(channel)
378@contextmanager
379def discussions_session(token: str):
380 channel = FakeChannel(token)
381 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
382 yield discussions_pb2_grpc.DiscussionsStub(channel)
385@contextmanager
386def donations_session(token: str):
387 channel = FakeChannel(token)
388 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
389 yield donations_pb2_grpc.DonationsStub(channel)
392@contextmanager
393def pages_session(token: str):
394 channel = FakeChannel(token)
395 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
396 yield pages_pb2_grpc.PagesStub(channel)
399@contextmanager
400def communities_session(token: str):
401 channel = FakeChannel(token)
402 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
403 yield communities_pb2_grpc.CommunitiesStub(channel)
406@contextmanager
407def groups_session(token: str):
408 channel = FakeChannel(token)
409 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
410 yield groups_pb2_grpc.GroupsStub(channel)
413@contextmanager
414def blocking_session(token: str):
415 channel = FakeChannel(token)
416 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
417 yield blocking_pb2_grpc.BlockingStub(channel)
420@contextmanager
421def notifications_session(token: str):
422 channel = FakeChannel(token)
423 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
424 yield notifications_pb2_grpc.NotificationsStub(channel)
427@contextmanager
428def account_session(token: str):
429 """
430 Create a Account API for testing, uses the token for auth
431 """
432 channel = FakeChannel(token)
433 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
434 yield account_pb2_grpc.AccountStub(channel)
437@contextmanager
438def search_session(token: str):
439 """
440 Create a Search API for testing, uses the token for auth
441 """
442 channel = FakeChannel(token)
443 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
444 yield search_pb2_grpc.SearchStub(channel)
447@contextmanager
448def references_session(token: str):
449 """
450 Create a References API for testing, uses the token for auth
451 """
452 channel = FakeChannel(token)
453 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
454 yield references_pb2_grpc.ReferencesStub(channel)
457@contextmanager
458def galleries_session(token: str):
459 """
460 Create a Galleries API for testing, uses the token for auth
461 """
462 channel = FakeChannel(token)
463 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
464 yield galleries_pb2_grpc.GalleriesStub(channel)
467@contextmanager
468def reporting_session(token: str):
469 channel = FakeChannel(token)
470 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
471 yield reporting_pb2_grpc.ReportingStub(channel)
474@contextmanager
475def events_session(token: str):
476 channel = FakeChannel(token)
477 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
478 yield events_pb2_grpc.EventsStub(channel)
481@contextmanager
482def postal_verification_session(token: str):
483 channel = FakeChannel(token)
484 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
485 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
488@contextmanager
489def bugs_session(token: str | None = None):
490 channel = FakeChannel(token)
491 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
492 yield bugs_pb2_grpc.BugsStub(channel)
495@contextmanager
496def resources_session():
497 channel = FakeChannel()
498 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
499 yield resources_pb2_grpc.ResourcesStub(channel)