Coverage for app/backend/src/tests/fixtures/sessions.py: 99%

295 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1from collections.abc import Generator 

2from concurrent import futures 

3from contextlib import contextmanager 

4from typing import Any, NoReturn 

5from zoneinfo import ZoneInfo 

6 

7import grpc 

8from grpc._server import _validate_generic_rpc_handlers 

9 

10from couchers.context import make_interactive_context 

11from couchers.db import session_scope 

12from couchers.descriptor_pool import get_descriptor_pool 

13from couchers.i18n import LocalizationContext 

14from couchers.i18n.locales import DEFAULT_LOCALE 

15from couchers.interceptors import ( 

16 CouchersMiddlewareInterceptor, 

17 _try_get_and_update_user_details, 

18 check_permissions, 

19 find_auth_level, 

20) 

21from couchers.proto import ( 

22 account_pb2_grpc, 

23 admin_pb2_grpc, 

24 api_pb2_grpc, 

25 auth_pb2_grpc, 

26 blocking_pb2_grpc, 

27 bugs_pb2_grpc, 

28 communities_pb2_grpc, 

29 conversations_pb2_grpc, 

30 discussions_pb2_grpc, 

31 donations_pb2_grpc, 

32 editor_pb2_grpc, 

33 events_pb2_grpc, 

34 galleries_pb2_grpc, 

35 gis_pb2_grpc, 

36 groups_pb2_grpc, 

37 iris_pb2_grpc, 

38 jail_pb2_grpc, 

39 media_pb2_grpc, 

40 moderation_pb2_grpc, 

41 notifications_pb2_grpc, 

42 pages_pb2_grpc, 

43 postal_verification_pb2_grpc, 

44 public_pb2_grpc, 

45 public_trips_pb2_grpc, 

46 references_pb2_grpc, 

47 reporting_pb2_grpc, 

48 requests_pb2_grpc, 

49 resources_pb2_grpc, 

50 search_pb2_grpc, 

51 stripe_pb2_grpc, 

52 threads_pb2_grpc, 

53) 

54from couchers.servicers.account import Account, Iris 

55from couchers.servicers.admin import Admin 

56from couchers.servicers.api import API 

57from couchers.servicers.auth import Auth 

58from couchers.servicers.blocking import Blocking 

59from couchers.servicers.bugs import Bugs 

60from couchers.servicers.communities import Communities 

61from couchers.servicers.conversations import Conversations 

62from couchers.servicers.discussions import Discussions 

63from couchers.servicers.donations import Donations, Stripe 

64from couchers.servicers.editor import Editor 

65from couchers.servicers.events import Events 

66from couchers.servicers.galleries import Galleries 

67from couchers.servicers.gis import GIS 

68from couchers.servicers.groups import Groups 

69from couchers.servicers.jail import Jail 

70from couchers.servicers.media import Media, get_media_auth_interceptor 

71from couchers.servicers.moderation import Moderation 

72from couchers.servicers.notifications import Notifications 

73from couchers.servicers.pages import Pages 

74from couchers.servicers.postal_verification import PostalVerification 

75from couchers.servicers.public import Public 

76from couchers.servicers.public_trips import PublicTrips 

77from couchers.servicers.references import References 

78from couchers.servicers.reporting import Reporting 

79from couchers.servicers.requests import Requests 

80from couchers.servicers.resources import Resources 

81from couchers.servicers.search import Search 

82from couchers.servicers.threads import Threads 

83 

84 

85class _MockCouchersContext: 

86 @property 

87 def headers(self): 

88 return {} 

89 

90 def get_header(self, name): 

91 return None 

92 

93 

94class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

95 """ 

96 Injects the right `cookie: couchers-sesh=...` header into the metadata 

97 """ 

98 

99 def __init__(self, token: str): 

100 self.token = token 

101 

102 def __call__(self, context, callback) -> None: 

103 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

104 

105 

106class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

107 def __init__(self): 

108 self.latest_headers = {} 

109 

110 def intercept_unary_unary(self, continuation, client_call_details, request): 

111 call = continuation(client_call_details, request) 

112 self.latest_headers = dict(call.initial_metadata()) 

113 self.latest_header_raw = call.initial_metadata() 

114 return call 

115 

116 

117class FakeRpcError(grpc.RpcError): 

118 def __init__(self, code: grpc.StatusCode, details: str): 

119 self._code = code 

120 self._details = details 

121 

122 def code(self) -> grpc.StatusCode: 

123 return self._code 

124 

125 def details(self) -> str: 

126 return self._details 

127 

128 

129class MockGrpcContext: 

130 """ 

131 Pure mock of grpc.ServicerContext for testing. 

132 """ 

133 

134 def __init__(self): 

135 self._initial_metadata = [] 

136 self._invocation_metadata: list[tuple[str, str]] = [] 

137 

138 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: 

139 raise FakeRpcError(code, details) 

140 

141 def invocation_metadata(self) -> list[tuple[str, str]]: 

142 return self._invocation_metadata 

143 

144 def send_initial_metadata(self, metadata): 

145 self._initial_metadata.extend(metadata) 

146 

147 

148class FakeChannel: 

149 """ 

150 Mock gRPC channel for testing that orchestrates context creation. 

151 

152 This holds the test state (token) and creates proper CouchersContext 

153 instances when handlers are invoked. 

154 """ 

155 

156 def __init__(self, token: str | None = None): 

157 self.handlers: dict[str, Any] = {} 

158 self._token = token 

159 self._pool = get_descriptor_pool() 

160 

161 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any): 

162 _validate_generic_rpc_handlers(generic_rpc_handlers) 

163 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

164 

165 def unary_unary(self, method, request_serializer, response_deserializer): 

166 handler = self.handlers[method] 

167 

168 def fake_handler(request): 

169 auth_info = _try_get_and_update_user_details( 

170 self._token, 

171 is_api_key=False, 

172 ip_address="127.0.0.1", 

173 user_agent="Testing User-Agent", 

174 sofa=None, 

175 client_platform=None, 

176 ) 

177 auth_level = find_auth_level(self._pool, method) 

178 check_permissions(auth_info, auth_level) 

179 

180 # Do a full serialization cycle on the request and the 

181 # response to catch accidental use of unserializable data. 

182 request = handler.request_deserializer(request_serializer(request)) 

183 

184 with session_scope() as session: 

185 context = make_interactive_context( 

186 grpc_context=MockGrpcContext(), 

187 user_id=auth_info.user_id if auth_info else None, 

188 is_api_key=False, 

189 token=self._token if auth_info else None, 

190 localization=LocalizationContext( 

191 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE, 

192 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"), 

193 ), 

194 sofa="test_sofa_cookie_value", 

195 ) 

196 

197 response = handler.unary_unary(request, context, session) 

198 

199 return response_deserializer(handler.response_serializer(response)) 

200 

201 return fake_handler 

202 

203 

204@contextmanager 

205def run_server(grpc_channel_options=(), token: str | None = None): 

206 with futures.ThreadPoolExecutor(1) as executor: 

207 if token: 

208 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

209 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

210 else: 

211 creds = grpc.local_channel_credentials() 

212 

213 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

214 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials()) 

215 srv.start() 

216 

217 try: 

218 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel: 

219 metadata_interceptor = MetadataKeeperInterceptor() 

220 channel = grpc.intercept_channel(channel, metadata_interceptor) 

221 yield srv, channel, metadata_interceptor 

222 finally: 

223 srv.stop(None).wait() 

224 

225 

226# Sessions that start a real GRPC server. 

227@contextmanager 

228def auth_api_session( 

229 grpc_channel_options=(), 

230) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]: 

231 """ 

232 Create an Auth API for testing 

233 

234 This needs to use the real server since it plays around with headers 

235 """ 

236 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor): 

237 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

238 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

239 

240 

241@contextmanager 

242def real_api_session(token: str): 

243 """ 

244 Create an API for testing, using TCP sockets, uses the token for auth 

245 """ 

246 with run_server(token=token) as (server, channel, metadata_interceptor): 

247 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

248 yield api_pb2_grpc.APIStub(channel) 

249 

250 

251@contextmanager 

252def real_admin_session(token: str): 

253 """ 

254 Create an Admin service for testing, using TCP sockets, uses the token for auth 

255 """ 

256 with run_server(token=token) as (server, channel, metadata_interceptor): 

257 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

258 yield admin_pb2_grpc.AdminStub(channel) 

259 

260 

261@contextmanager 

262def real_editor_session(token: str): 

263 """ 

264 Create an Editor service for testing, using TCP sockets, uses the token for auth 

265 """ 

266 with run_server(token=token) as (server, channel, metadata_interceptor): 

267 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

268 yield editor_pb2_grpc.EditorStub(channel) 

269 

270 

271@contextmanager 

272def real_moderation_session(token: str): 

273 """ 

274 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

275 """ 

276 with run_server(token=token) as (server, channel, metadata_interceptor): 

277 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

278 yield moderation_pb2_grpc.ModerationStub(channel) 

279 

280 

281@contextmanager 

282def real_account_session(token: str): 

283 """ 

284 Create an Account service for testing, using TCP sockets, uses the token for auth 

285 """ 

286 with run_server(token=token) as (server, channel, metadata_interceptor): 

287 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

288 yield account_pb2_grpc.AccountStub(channel) 

289 

290 

291@contextmanager 

292def real_jail_session(token: str): 

293 """ 

294 Create a Jail service for testing, using TCP sockets, uses the token for auth 

295 """ 

296 with run_server(token=token) as (server, channel, metadata_interceptor): 

297 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

298 yield jail_pb2_grpc.JailStub(channel) 

299 

300 

301@contextmanager 

302def real_stripe_session(): 

303 """ 

304 Create a Stripe service for testing, using TCP sockets 

305 """ 

306 with run_server() as (server, channel, metadata_interceptor): 

307 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

308 yield stripe_pb2_grpc.StripeStub(channel) 

309 

310 

311@contextmanager 

312def real_iris_session(): 

313 with run_server() as (server, channel, metadata_interceptor): 

314 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

315 yield iris_pb2_grpc.IrisStub(channel) 

316 

317 

318@contextmanager 

319def real_bugs_session(): 

320 """ 

321 Bugs over a real server so requests can carry metadata (HTTP request headers) 

322 and the response's initial metadata (HTTP response headers) can be asserted. 

323 """ 

324 with run_server() as (server, channel, metadata_interceptor): 

325 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), server) 

326 yield bugs_pb2_grpc.BugsStub(channel), metadata_interceptor 

327 

328 

329@contextmanager 

330def media_session(bearer_token: str): 

331 """ 

332 Create a fresh Media API for testing, uses the bearer token for media auth 

333 """ 

334 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

335 

336 with futures.ThreadPoolExecutor(1) as executor: 

337 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

338 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

339 media_pb2_grpc.add_MediaServicer_to_server(Media(), server) 

340 server.start() 

341 

342 call_creds = grpc.access_token_call_credentials(bearer_token) 

343 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

344 

345 try: 

346 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

347 yield media_pb2_grpc.MediaStub(channel) 

348 finally: 

349 server.stop(None).wait() 

350 

351 

352# Sessions that don't need to start a real GRPC server. 

353# Note: these don't need to be context managers, but they are so that 

354# we can switch to a real implementation if needed. 

355@contextmanager 

356def api_session(token: str): 

357 """ 

358 Create an API for testing, uses the token for auth 

359 """ 

360 channel = FakeChannel(token) 

361 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

362 yield api_pb2_grpc.APIStub(channel) 

363 

364 

365@contextmanager 

366def gis_session(token: str): 

367 channel = FakeChannel(token) 

368 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

369 yield gis_pb2_grpc.GISStub(channel) 

370 

371 

372@contextmanager 

373def public_session(): 

374 channel = FakeChannel() 

375 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

376 yield public_pb2_grpc.PublicStub(channel) 

377 

378 

379@contextmanager 

380def public_trips_session(token: str): 

381 channel = FakeChannel(token) 

382 public_trips_pb2_grpc.add_PublicTripsServicer_to_server(PublicTrips(), channel) 

383 yield public_trips_pb2_grpc.PublicTripsStub(channel) 

384 

385 

386@contextmanager 

387def conversations_session(token: str): 

388 """ 

389 Create a Conversations API for testing, uses the token for auth 

390 """ 

391 channel = FakeChannel(token) 

392 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

393 yield conversations_pb2_grpc.ConversationsStub(channel) 

394 

395 

396@contextmanager 

397def requests_session(token: str): 

398 """ 

399 Create a Requests API for testing, uses the token for auth 

400 """ 

401 channel = FakeChannel(token) 

402 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

403 yield requests_pb2_grpc.RequestsStub(channel) 

404 

405 

406@contextmanager 

407def threads_session(token: str): 

408 channel = FakeChannel(token) 

409 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

410 yield threads_pb2_grpc.ThreadsStub(channel) 

411 

412 

413@contextmanager 

414def discussions_session(token: str): 

415 channel = FakeChannel(token) 

416 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

417 yield discussions_pb2_grpc.DiscussionsStub(channel) 

418 

419 

420@contextmanager 

421def donations_session(token: str): 

422 channel = FakeChannel(token) 

423 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

424 yield donations_pb2_grpc.DonationsStub(channel) 

425 

426 

427@contextmanager 

428def pages_session(token: str): 

429 channel = FakeChannel(token) 

430 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

431 yield pages_pb2_grpc.PagesStub(channel) 

432 

433 

434@contextmanager 

435def communities_session(token: str): 

436 channel = FakeChannel(token) 

437 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

438 yield communities_pb2_grpc.CommunitiesStub(channel) 

439 

440 

441@contextmanager 

442def groups_session(token: str): 

443 channel = FakeChannel(token) 

444 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

445 yield groups_pb2_grpc.GroupsStub(channel) 

446 

447 

448@contextmanager 

449def blocking_session(token: str): 

450 channel = FakeChannel(token) 

451 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

452 yield blocking_pb2_grpc.BlockingStub(channel) 

453 

454 

455@contextmanager 

456def notifications_session(token: str): 

457 channel = FakeChannel(token) 

458 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

459 yield notifications_pb2_grpc.NotificationsStub(channel) 

460 

461 

462@contextmanager 

463def account_session(token: str): 

464 """ 

465 Create a Account API for testing, uses the token for auth 

466 """ 

467 channel = FakeChannel(token) 

468 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

469 yield account_pb2_grpc.AccountStub(channel) 

470 

471 

472@contextmanager 

473def search_session(token: str): 

474 """ 

475 Create a Search API for testing, uses the token for auth 

476 """ 

477 channel = FakeChannel(token) 

478 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

479 yield search_pb2_grpc.SearchStub(channel) 

480 

481 

482@contextmanager 

483def references_session(token: str): 

484 """ 

485 Create a References API for testing, uses the token for auth 

486 """ 

487 channel = FakeChannel(token) 

488 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

489 yield references_pb2_grpc.ReferencesStub(channel) 

490 

491 

492@contextmanager 

493def galleries_session(token: str): 

494 """ 

495 Create a Galleries API for testing, uses the token for auth 

496 """ 

497 channel = FakeChannel(token) 

498 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

499 yield galleries_pb2_grpc.GalleriesStub(channel) 

500 

501 

502@contextmanager 

503def reporting_session(token: str): 

504 channel = FakeChannel(token) 

505 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

506 yield reporting_pb2_grpc.ReportingStub(channel) 

507 

508 

509@contextmanager 

510def events_session(token: str): 

511 channel = FakeChannel(token) 

512 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

513 yield events_pb2_grpc.EventsStub(channel) 

514 

515 

516@contextmanager 

517def postal_verification_session(token: str): 

518 channel = FakeChannel(token) 

519 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

520 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

521 

522 

523@contextmanager 

524def bugs_session(token: str | None = None): 

525 channel = FakeChannel(token) 

526 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

527 yield bugs_pb2_grpc.BugsStub(channel) 

528 

529 

530@contextmanager 

531def resources_session(): 

532 channel = FakeChannel() 

533 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

534 yield resources_pb2_grpc.ResourcesStub(channel)