Coverage for src / tests / fixtures / sessions.py: 99%
289 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 16:21 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 16:21 +0000
1from collections.abc import Generator
2from concurrent import futures
3from contextlib import contextmanager
4from typing import Any
6import grpc
7from grpc._server import _validate_generic_rpc_handlers
9from couchers.context import make_interactive_context
10from couchers.db import session_scope
11from couchers.descriptor_pool import get_descriptor_pool
12from couchers.interceptors import CouchersMiddlewareInterceptor, UserAuthInfo, _try_get_and_update_user_details
13from couchers.proto import (
14 account_pb2_grpc,
15 admin_pb2_grpc,
16 annotations_pb2,
17 api_pb2_grpc,
18 auth_pb2_grpc,
19 blocking_pb2_grpc,
20 bugs_pb2_grpc,
21 communities_pb2_grpc,
22 conversations_pb2_grpc,
23 discussions_pb2_grpc,
24 donations_pb2_grpc,
25 editor_pb2_grpc,
26 events_pb2_grpc,
27 galleries_pb2_grpc,
28 gis_pb2_grpc,
29 groups_pb2_grpc,
30 iris_pb2_grpc,
31 jail_pb2_grpc,
32 media_pb2_grpc,
33 moderation_pb2_grpc,
34 notifications_pb2_grpc,
35 pages_pb2_grpc,
36 postal_verification_pb2_grpc,
37 public_pb2_grpc,
38 references_pb2_grpc,
39 reporting_pb2_grpc,
40 requests_pb2_grpc,
41 resources_pb2_grpc,
42 search_pb2_grpc,
43 stripe_pb2_grpc,
44 threads_pb2_grpc,
45)
46from couchers.servicers.account import Account, Iris
47from couchers.servicers.admin import Admin
48from couchers.servicers.api import API
49from couchers.servicers.auth import Auth
50from couchers.servicers.blocking import Blocking
51from couchers.servicers.bugs import Bugs
52from couchers.servicers.communities import Communities
53from couchers.servicers.conversations import Conversations
54from couchers.servicers.discussions import Discussions
55from couchers.servicers.donations import Donations, Stripe
56from couchers.servicers.editor import Editor
57from couchers.servicers.events import Events
58from couchers.servicers.galleries import Galleries
59from couchers.servicers.gis import GIS
60from couchers.servicers.groups import Groups
61from couchers.servicers.jail import Jail
62from couchers.servicers.media import Media, get_media_auth_interceptor
63from couchers.servicers.moderation import Moderation
64from couchers.servicers.notifications import Notifications
65from couchers.servicers.pages import Pages
66from couchers.servicers.postal_verification import PostalVerification
67from couchers.servicers.public import Public
68from couchers.servicers.references import References
69from couchers.servicers.reporting import Reporting
70from couchers.servicers.requests import Requests
71from couchers.servicers.resources import Resources
72from couchers.servicers.search import Search
73from couchers.servicers.threads import Threads
76class _MockCouchersContext:
77 @property
78 def headers(self):
79 return {}
82class CookieMetadataPlugin(grpc.AuthMetadataPlugin):
83 """
84 Injects the right `cookie: couchers-sesh=...` header into the metadata
85 """
87 def __init__(self, token: str):
88 self.token = token
90 def __call__(self, context, callback) -> None:
91 callback((("cookie", f"couchers-sesh={self.token}"),), None)
94class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor):
95 def __init__(self):
96 self.latest_headers = {}
98 def intercept_unary_unary(self, continuation, client_call_details, request):
99 call = continuation(client_call_details, request)
100 self.latest_headers = dict(call.initial_metadata())
101 self.latest_header_raw = call.initial_metadata()
102 return call
105class FakeRpcError(grpc.RpcError):
106 def __init__(self, code, details):
107 self._code = code
108 self._details = details
110 def code(self):
111 return self._code
113 def details(self):
114 return self._details
117def _check_user_perms(method: str, auth_info: UserAuthInfo | None) -> None:
118 # method is of the form "/org.couchers.api.core.API/GetUser"
119 _, service_name, method_name = method.split("/")
121 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions()
122 auth_level = service_options.Extensions[annotations_pb2.auth_level]
123 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN
124 assert auth_level in [
125 annotations_pb2.AUTH_LEVEL_OPEN,
126 annotations_pb2.AUTH_LEVEL_JAILED,
127 annotations_pb2.AUTH_LEVEL_SECURE,
128 annotations_pb2.AUTH_LEVEL_EDITOR,
129 annotations_pb2.AUTH_LEVEL_ADMIN,
130 ]
132 if not auth_info:
133 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN
134 else:
135 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not auth_info.is_superuser), (
136 "Non-superuser tried to call superuser API"
137 )
138 assert not (auth_level == annotations_pb2.AUTH_LEVEL_EDITOR and not auth_info.is_editor), (
139 "Non-editor tried to call editor API"
140 )
141 assert not (
142 auth_info.is_jailed
143 and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED]
144 ), "User is jailed but tried to call non-open/non-jailed API"
147class MockGrpcContext:
148 """
149 Pure mock of grpc.ServicerContext for testing.
150 """
152 def __init__(self):
153 self._initial_metadata = []
154 self._invocation_metadata = []
156 def abort(self, code, details):
157 raise FakeRpcError(code, details)
159 def invocation_metadata(self):
160 return self._invocation_metadata
162 def send_initial_metadata(self, metadata):
163 self._initial_metadata.extend(metadata)
166class FakeChannel:
167 """
168 Mock gRPC channel for testing that orchestrates context creation.
170 This holds test state (token) and creates proper CouchersContext
171 instances when handlers are invoked.
172 """
174 def __init__(self, token: str | None = None):
175 self.handlers: dict[str, Any] = {}
176 self._token = token
178 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any):
179 _validate_generic_rpc_handlers(generic_rpc_handlers)
180 self.handlers.update(generic_rpc_handlers[0]._method_handlers)
182 def unary_unary(self, uri, request_serializer, response_deserializer):
183 handler = self.handlers[uri]
185 def fake_handler(request):
186 auth_info = _try_get_and_update_user_details(
187 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent"
188 )
190 _check_user_perms(uri, auth_info)
192 # Do a full serialization cycle on the request and the
193 # response to catch accidental use of unserializable data.
194 request = handler.request_deserializer(request_serializer(request))
196 with session_scope() as session:
197 mock_grpc_ctx = MockGrpcContext()
199 context = make_interactive_context(
200 grpc_context=mock_grpc_ctx,
201 user_id=auth_info.user_id if auth_info else None,
202 is_api_key=False,
203 token=self._token if auth_info else None,
204 ui_language_preference=auth_info.ui_language_preference if auth_info else None,
205 )
207 response = handler.unary_unary(request, context, session)
209 return response_deserializer(handler.response_serializer(response))
211 return fake_handler
214@contextmanager
215def run_server(grpc_channel_options=(), token: str | None = None):
216 with futures.ThreadPoolExecutor(1) as executor:
217 if token:
218 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token))
219 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
220 else:
221 creds = grpc.local_channel_credentials()
223 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()])
224 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials())
225 srv.start()
227 try:
228 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel:
229 metadata_interceptor = _MetadataKeeperInterceptor()
230 channel = grpc.intercept_channel(channel, metadata_interceptor)
231 yield srv, channel, metadata_interceptor
232 finally:
233 srv.stop(None).wait()
236# Sessions that start a real GRPC server.
237@contextmanager
238def auth_api_session(
239 grpc_channel_options=(),
240) -> Generator[tuple[auth_pb2_grpc.AuthStub, grpc.UnaryUnaryClientInterceptor]]:
241 """
242 Create an Auth API for testing
244 This needs to use the real server since it plays around with headers
245 """
246 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor):
247 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server)
248 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor
251@contextmanager
252def real_api_session(token: str):
253 """
254 Create an API for testing, using TCP sockets, uses the token for auth
255 """
256 with run_server(token=token) as (server, channel, metadata_interceptor):
257 api_pb2_grpc.add_APIServicer_to_server(API(), server)
258 yield api_pb2_grpc.APIStub(channel)
261@contextmanager
262def real_admin_session(token: str):
263 """
264 Create an Admin service for testing, using TCP sockets, uses the token for auth
265 """
266 with run_server(token=token) as (server, channel, metadata_interceptor):
267 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server)
268 yield admin_pb2_grpc.AdminStub(channel)
271@contextmanager
272def real_editor_session(token: str):
273 """
274 Create an Editor service for testing, using TCP sockets, uses the token for auth
275 """
276 with run_server(token=token) as (server, channel, metadata_interceptor):
277 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server)
278 yield editor_pb2_grpc.EditorStub(channel)
281@contextmanager
282def real_moderation_session(token: str):
283 """
284 Create a Moderation service for testing, using TCP sockets, uses the token for auth
285 """
286 with run_server(token=token) as (server, channel, metadata_interceptor):
287 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server)
288 yield moderation_pb2_grpc.ModerationStub(channel)
291@contextmanager
292def real_account_session(token: str):
293 """
294 Create an Account service for testing, using TCP sockets, uses the token for auth
295 """
296 with run_server(token=token) as (server, channel, metadata_interceptor):
297 account_pb2_grpc.add_AccountServicer_to_server(Account(), server)
298 yield account_pb2_grpc.AccountStub(channel)
301@contextmanager
302def real_jail_session(token: str):
303 """
304 Create a Jail service for testing, using TCP sockets, uses the token for auth
305 """
306 with run_server(token=token) as (server, channel, metadata_interceptor):
307 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server)
308 yield jail_pb2_grpc.JailStub(channel)
311@contextmanager
312def real_stripe_session():
313 """
314 Create a Stripe service for testing, using TCP sockets
315 """
316 with run_server() as (server, channel, metadata_interceptor):
317 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server)
318 yield stripe_pb2_grpc.StripeStub(channel)
321@contextmanager
322def real_iris_session():
323 with run_server() as (server, channel, metadata_interceptor):
324 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server)
325 yield iris_pb2_grpc.IrisStub(channel)
328@contextmanager
329def media_session(bearer_token: str):
330 """
331 Create a fresh Media API for testing, uses the bearer token for media auth
332 """
333 media_auth_interceptor = get_media_auth_interceptor(bearer_token)
335 with futures.ThreadPoolExecutor(1) as executor:
336 server = grpc.server(executor, interceptors=[media_auth_interceptor])
337 port = server.add_secure_port("localhost:0", grpc.local_server_credentials())
338 media_pb2_grpc.add_MediaServicer_to_server(Media(), server)
339 server.start()
341 call_creds = grpc.access_token_call_credentials(bearer_token)
342 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds)
344 try:
345 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel:
346 yield media_pb2_grpc.MediaStub(channel)
347 finally:
348 server.stop(None).wait()
351# Sessions that don't need to start a real GRPC server.
352# Note: these don't need to be context managers, but they are so that
353# we can switch to a real implementation if needed.
354@contextmanager
355def api_session(token: str):
356 """
357 Create an API for testing, uses the token for auth
358 """
359 channel = FakeChannel(token)
360 api_pb2_grpc.add_APIServicer_to_server(API(), channel)
361 yield api_pb2_grpc.APIStub(channel)
364@contextmanager
365def gis_session(token: str):
366 channel = FakeChannel(token)
367 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel)
368 yield gis_pb2_grpc.GISStub(channel)
371@contextmanager
372def public_session():
373 channel = FakeChannel()
374 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel)
375 yield public_pb2_grpc.PublicStub(channel)
378@contextmanager
379def conversations_session(token: str):
380 """
381 Create a Conversations API for testing, uses the token for auth
382 """
383 channel = FakeChannel(token)
384 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel)
385 yield conversations_pb2_grpc.ConversationsStub(channel)
388@contextmanager
389def requests_session(token: str):
390 """
391 Create a Requests API for testing, uses the token for auth
392 """
393 channel = FakeChannel(token)
394 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel)
395 yield requests_pb2_grpc.RequestsStub(channel)
398@contextmanager
399def threads_session(token: str):
400 channel = FakeChannel(token)
401 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel)
402 yield threads_pb2_grpc.ThreadsStub(channel)
405@contextmanager
406def discussions_session(token: str):
407 channel = FakeChannel(token)
408 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel)
409 yield discussions_pb2_grpc.DiscussionsStub(channel)
412@contextmanager
413def donations_session(token: str):
414 channel = FakeChannel(token)
415 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel)
416 yield donations_pb2_grpc.DonationsStub(channel)
419@contextmanager
420def pages_session(token: str):
421 channel = FakeChannel(token)
422 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel)
423 yield pages_pb2_grpc.PagesStub(channel)
426@contextmanager
427def communities_session(token: str):
428 channel = FakeChannel(token)
429 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel)
430 yield communities_pb2_grpc.CommunitiesStub(channel)
433@contextmanager
434def groups_session(token: str):
435 channel = FakeChannel(token)
436 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel)
437 yield groups_pb2_grpc.GroupsStub(channel)
440@contextmanager
441def blocking_session(token: str):
442 channel = FakeChannel(token)
443 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel)
444 yield blocking_pb2_grpc.BlockingStub(channel)
447@contextmanager
448def notifications_session(token: str):
449 channel = FakeChannel(token)
450 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel)
451 yield notifications_pb2_grpc.NotificationsStub(channel)
454@contextmanager
455def account_session(token: str):
456 """
457 Create a Account API for testing, uses the token for auth
458 """
459 channel = FakeChannel(token)
460 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel)
461 yield account_pb2_grpc.AccountStub(channel)
464@contextmanager
465def search_session(token: str):
466 """
467 Create a Search API for testing, uses the token for auth
468 """
469 channel = FakeChannel(token)
470 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel)
471 yield search_pb2_grpc.SearchStub(channel)
474@contextmanager
475def references_session(token: str):
476 """
477 Create a References API for testing, uses the token for auth
478 """
479 channel = FakeChannel(token)
480 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel)
481 yield references_pb2_grpc.ReferencesStub(channel)
484@contextmanager
485def galleries_session(token: str):
486 """
487 Create a Galleries API for testing, uses the token for auth
488 """
489 channel = FakeChannel(token)
490 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel)
491 yield galleries_pb2_grpc.GalleriesStub(channel)
494@contextmanager
495def reporting_session(token: str):
496 channel = FakeChannel(token)
497 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel)
498 yield reporting_pb2_grpc.ReportingStub(channel)
501@contextmanager
502def events_session(token: str):
503 channel = FakeChannel(token)
504 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel)
505 yield events_pb2_grpc.EventsStub(channel)
508@contextmanager
509def postal_verification_session(token: str):
510 channel = FakeChannel(token)
511 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel)
512 yield postal_verification_pb2_grpc.PostalVerificationStub(channel)
515@contextmanager
516def bugs_session(token: str | None = None):
517 channel = FakeChannel(token)
518 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel)
519 yield bugs_pb2_grpc.BugsStub(channel)
522@contextmanager
523def resources_session():
524 channel = FakeChannel()
525 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel)
526 yield resources_pb2_grpc.ResourcesStub(channel)