Coverage for app / backend / src / tests / fixtures / sessions.py: 99%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1from collections.abc import Generator 

2from concurrent import futures 

3from contextlib import contextmanager 

4from typing import Any, NoReturn 

5from zoneinfo import ZoneInfo 

6 

7import grpc 

8from grpc._server import _validate_generic_rpc_handlers 

9 

10from couchers.context import make_interactive_context 

11from couchers.db import session_scope 

12from couchers.descriptor_pool import get_descriptor_pool 

13from couchers.i18n import LocalizationContext 

14from couchers.i18n.locales import DEFAULT_LOCALE 

15from couchers.interceptors import ( 

16 CouchersMiddlewareInterceptor, 

17 _try_get_and_update_user_details, 

18 check_permissions, 

19 find_auth_level, 

20) 

21from couchers.proto import ( 

22 account_pb2_grpc, 

23 admin_pb2_grpc, 

24 api_pb2_grpc, 

25 auth_pb2_grpc, 

26 blocking_pb2_grpc, 

27 bugs_pb2_grpc, 

28 communities_pb2_grpc, 

29 conversations_pb2_grpc, 

30 discussions_pb2_grpc, 

31 donations_pb2_grpc, 

32 editor_pb2_grpc, 

33 events_pb2_grpc, 

34 galleries_pb2_grpc, 

35 gis_pb2_grpc, 

36 groups_pb2_grpc, 

37 iris_pb2_grpc, 

38 jail_pb2_grpc, 

39 media_pb2_grpc, 

40 moderation_pb2_grpc, 

41 notifications_pb2_grpc, 

42 pages_pb2_grpc, 

43 postal_verification_pb2_grpc, 

44 public_pb2_grpc, 

45 public_trips_pb2_grpc, 

46 references_pb2_grpc, 

47 reporting_pb2_grpc, 

48 requests_pb2_grpc, 

49 resources_pb2_grpc, 

50 search_pb2_grpc, 

51 stripe_pb2_grpc, 

52 threads_pb2_grpc, 

53) 

54from couchers.servicers.account import Account, Iris 

55from couchers.servicers.admin import Admin 

56from couchers.servicers.api import API 

57from couchers.servicers.auth import Auth 

58from couchers.servicers.blocking import Blocking 

59from couchers.servicers.bugs import Bugs 

60from couchers.servicers.communities import Communities 

61from couchers.servicers.conversations import Conversations 

62from couchers.servicers.discussions import Discussions 

63from couchers.servicers.donations import Donations, Stripe 

64from couchers.servicers.editor import Editor 

65from couchers.servicers.events import Events 

66from couchers.servicers.galleries import Galleries 

67from couchers.servicers.gis import GIS 

68from couchers.servicers.groups import Groups 

69from couchers.servicers.jail import Jail 

70from couchers.servicers.media import Media, get_media_auth_interceptor 

71from couchers.servicers.moderation import Moderation 

72from couchers.servicers.notifications import Notifications 

73from couchers.servicers.pages import Pages 

74from couchers.servicers.postal_verification import PostalVerification 

75from couchers.servicers.public import Public 

76from couchers.servicers.public_trips import PublicTrips 

77from couchers.servicers.references import References 

78from couchers.servicers.reporting import Reporting 

79from couchers.servicers.requests import Requests 

80from couchers.servicers.resources import Resources 

81from couchers.servicers.search import Search 

82from couchers.servicers.threads import Threads 

83 

84 

85class _MockCouchersContext: 

86 @property 

87 def headers(self): 

88 return {} 

89 

90 

91class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

92 """ 

93 Injects the right `cookie: couchers-sesh=...` header into the metadata 

94 """ 

95 

96 def __init__(self, token: str): 

97 self.token = token 

98 

99 def __call__(self, context, callback) -> None: 

100 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

101 

102 

103class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

104 def __init__(self): 

105 self.latest_headers = {} 

106 

107 def intercept_unary_unary(self, continuation, client_call_details, request): 

108 call = continuation(client_call_details, request) 

109 self.latest_headers = dict(call.initial_metadata()) 

110 self.latest_header_raw = call.initial_metadata() 

111 return call 

112 

113 

114class FakeRpcError(grpc.RpcError): 

115 def __init__(self, code: grpc.StatusCode, details: str): 

116 self._code = code 

117 self._details = details 

118 

119 def code(self) -> grpc.StatusCode: 

120 return self._code 

121 

122 def details(self) -> str: 

123 return self._details 

124 

125 

126class MockGrpcContext: 

127 """ 

128 Pure mock of grpc.ServicerContext for testing. 

129 """ 

130 

131 def __init__(self): 

132 self._initial_metadata = [] 

133 self._invocation_metadata: list[tuple[str, str]] = [] 

134 

135 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: 

136 raise FakeRpcError(code, details) 

137 

138 def invocation_metadata(self) -> list[tuple[str, str]]: 

139 return self._invocation_metadata 

140 

141 def send_initial_metadata(self, metadata): 

142 self._initial_metadata.extend(metadata) 

143 

144 

145class FakeChannel: 

146 """ 

147 Mock gRPC channel for testing that orchestrates context creation. 

148 

149 This holds the test state (token) and creates proper CouchersContext 

150 instances when handlers are invoked. 

151 """ 

152 

153 def __init__(self, token: str | None = None): 

154 self.handlers: dict[str, Any] = {} 

155 self._token = token 

156 self._pool = get_descriptor_pool() 

157 

158 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any): 

159 _validate_generic_rpc_handlers(generic_rpc_handlers) 

160 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

161 

162 def unary_unary(self, method, request_serializer, response_deserializer): 

163 handler = self.handlers[method] 

164 

165 def fake_handler(request): 

166 auth_info = _try_get_and_update_user_details( 

167 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

168 ) 

169 auth_level = find_auth_level(self._pool, method) 

170 check_permissions(auth_info, auth_level) 

171 

172 # Do a full serialization cycle on the request and the 

173 # response to catch accidental use of unserializable data. 

174 request = handler.request_deserializer(request_serializer(request)) 

175 

176 with session_scope() as session: 

177 context = make_interactive_context( 

178 grpc_context=MockGrpcContext(), 

179 user_id=auth_info.user_id if auth_info else None, 

180 is_api_key=False, 

181 token=self._token if auth_info else None, 

182 localization=LocalizationContext( 

183 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE, 

184 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"), 

185 ), 

186 ) 

187 

188 response = handler.unary_unary(request, context, session) 

189 

190 return response_deserializer(handler.response_serializer(response)) 

191 

192 return fake_handler 

193 

194 

195@contextmanager 

196def run_server(grpc_channel_options=(), token: str | None = None): 

197 with futures.ThreadPoolExecutor(1) as executor: 

198 if token: 

199 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

200 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

201 else: 

202 creds = grpc.local_channel_credentials() 

203 

204 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

205 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials()) 

206 srv.start() 

207 

208 try: 

209 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel: 

210 metadata_interceptor = MetadataKeeperInterceptor() 

211 channel = grpc.intercept_channel(channel, metadata_interceptor) 

212 yield srv, channel, metadata_interceptor 

213 finally: 

214 srv.stop(None).wait() 

215 

216 

217# Sessions that start a real GRPC server. 

218@contextmanager 

219def auth_api_session( 

220 grpc_channel_options=(), 

221) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]: 

222 """ 

223 Create an Auth API for testing 

224 

225 This needs to use the real server since it plays around with headers 

226 """ 

227 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor): 

228 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

229 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

230 

231 

232@contextmanager 

233def real_api_session(token: str): 

234 """ 

235 Create an API for testing, using TCP sockets, uses the token for auth 

236 """ 

237 with run_server(token=token) as (server, channel, metadata_interceptor): 

238 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

239 yield api_pb2_grpc.APIStub(channel) 

240 

241 

242@contextmanager 

243def real_admin_session(token: str): 

244 """ 

245 Create an Admin service for testing, using TCP sockets, uses the token for auth 

246 """ 

247 with run_server(token=token) as (server, channel, metadata_interceptor): 

248 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

249 yield admin_pb2_grpc.AdminStub(channel) 

250 

251 

252@contextmanager 

253def real_editor_session(token: str): 

254 """ 

255 Create an Editor service for testing, using TCP sockets, uses the token for auth 

256 """ 

257 with run_server(token=token) as (server, channel, metadata_interceptor): 

258 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

259 yield editor_pb2_grpc.EditorStub(channel) 

260 

261 

262@contextmanager 

263def real_moderation_session(token: str): 

264 """ 

265 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

266 """ 

267 with run_server(token=token) as (server, channel, metadata_interceptor): 

268 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

269 yield moderation_pb2_grpc.ModerationStub(channel) 

270 

271 

272@contextmanager 

273def real_account_session(token: str): 

274 """ 

275 Create an Account service for testing, using TCP sockets, uses the token for auth 

276 """ 

277 with run_server(token=token) as (server, channel, metadata_interceptor): 

278 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

279 yield account_pb2_grpc.AccountStub(channel) 

280 

281 

282@contextmanager 

283def real_jail_session(token: str): 

284 """ 

285 Create a Jail service for testing, using TCP sockets, uses the token for auth 

286 """ 

287 with run_server(token=token) as (server, channel, metadata_interceptor): 

288 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

289 yield jail_pb2_grpc.JailStub(channel) 

290 

291 

292@contextmanager 

293def real_stripe_session(): 

294 """ 

295 Create a Stripe service for testing, using TCP sockets 

296 """ 

297 with run_server() as (server, channel, metadata_interceptor): 

298 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

299 yield stripe_pb2_grpc.StripeStub(channel) 

300 

301 

302@contextmanager 

303def real_iris_session(): 

304 with run_server() as (server, channel, metadata_interceptor): 

305 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

306 yield iris_pb2_grpc.IrisStub(channel) 

307 

308 

309@contextmanager 

310def media_session(bearer_token: str): 

311 """ 

312 Create a fresh Media API for testing, uses the bearer token for media auth 

313 """ 

314 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

315 

316 with futures.ThreadPoolExecutor(1) as executor: 

317 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

318 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

319 media_pb2_grpc.add_MediaServicer_to_server(Media(), server) 

320 server.start() 

321 

322 call_creds = grpc.access_token_call_credentials(bearer_token) 

323 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

324 

325 try: 

326 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

327 yield media_pb2_grpc.MediaStub(channel) 

328 finally: 

329 server.stop(None).wait() 

330 

331 

332# Sessions that don't need to start a real GRPC server. 

333# Note: these don't need to be context managers, but they are so that 

334# we can switch to a real implementation if needed. 

335@contextmanager 

336def api_session(token: str): 

337 """ 

338 Create an API for testing, uses the token for auth 

339 """ 

340 channel = FakeChannel(token) 

341 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

342 yield api_pb2_grpc.APIStub(channel) 

343 

344 

345@contextmanager 

346def gis_session(token: str): 

347 channel = FakeChannel(token) 

348 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

349 yield gis_pb2_grpc.GISStub(channel) 

350 

351 

352@contextmanager 

353def public_session(): 

354 channel = FakeChannel() 

355 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

356 yield public_pb2_grpc.PublicStub(channel) 

357 

358 

359@contextmanager 

360def public_trips_session(token: str): 

361 channel = FakeChannel(token) 

362 public_trips_pb2_grpc.add_PublicTripsServicer_to_server(PublicTrips(), channel) 

363 yield public_trips_pb2_grpc.PublicTripsStub(channel) 

364 

365 

366@contextmanager 

367def conversations_session(token: str): 

368 """ 

369 Create a Conversations API for testing, uses the token for auth 

370 """ 

371 channel = FakeChannel(token) 

372 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

373 yield conversations_pb2_grpc.ConversationsStub(channel) 

374 

375 

376@contextmanager 

377def requests_session(token: str): 

378 """ 

379 Create a Requests API for testing, uses the token for auth 

380 """ 

381 channel = FakeChannel(token) 

382 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

383 yield requests_pb2_grpc.RequestsStub(channel) 

384 

385 

386@contextmanager 

387def threads_session(token: str): 

388 channel = FakeChannel(token) 

389 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

390 yield threads_pb2_grpc.ThreadsStub(channel) 

391 

392 

393@contextmanager 

394def discussions_session(token: str): 

395 channel = FakeChannel(token) 

396 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

397 yield discussions_pb2_grpc.DiscussionsStub(channel) 

398 

399 

400@contextmanager 

401def donations_session(token: str): 

402 channel = FakeChannel(token) 

403 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

404 yield donations_pb2_grpc.DonationsStub(channel) 

405 

406 

407@contextmanager 

408def pages_session(token: str): 

409 channel = FakeChannel(token) 

410 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

411 yield pages_pb2_grpc.PagesStub(channel) 

412 

413 

414@contextmanager 

415def communities_session(token: str): 

416 channel = FakeChannel(token) 

417 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

418 yield communities_pb2_grpc.CommunitiesStub(channel) 

419 

420 

421@contextmanager 

422def groups_session(token: str): 

423 channel = FakeChannel(token) 

424 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

425 yield groups_pb2_grpc.GroupsStub(channel) 

426 

427 

428@contextmanager 

429def blocking_session(token: str): 

430 channel = FakeChannel(token) 

431 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

432 yield blocking_pb2_grpc.BlockingStub(channel) 

433 

434 

435@contextmanager 

436def notifications_session(token: str): 

437 channel = FakeChannel(token) 

438 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

439 yield notifications_pb2_grpc.NotificationsStub(channel) 

440 

441 

442@contextmanager 

443def account_session(token: str): 

444 """ 

445 Create a Account API for testing, uses the token for auth 

446 """ 

447 channel = FakeChannel(token) 

448 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

449 yield account_pb2_grpc.AccountStub(channel) 

450 

451 

452@contextmanager 

453def search_session(token: str): 

454 """ 

455 Create a Search API for testing, uses the token for auth 

456 """ 

457 channel = FakeChannel(token) 

458 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

459 yield search_pb2_grpc.SearchStub(channel) 

460 

461 

462@contextmanager 

463def references_session(token: str): 

464 """ 

465 Create a References API for testing, uses the token for auth 

466 """ 

467 channel = FakeChannel(token) 

468 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

469 yield references_pb2_grpc.ReferencesStub(channel) 

470 

471 

472@contextmanager 

473def galleries_session(token: str): 

474 """ 

475 Create a Galleries API for testing, uses the token for auth 

476 """ 

477 channel = FakeChannel(token) 

478 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

479 yield galleries_pb2_grpc.GalleriesStub(channel) 

480 

481 

482@contextmanager 

483def reporting_session(token: str): 

484 channel = FakeChannel(token) 

485 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

486 yield reporting_pb2_grpc.ReportingStub(channel) 

487 

488 

489@contextmanager 

490def events_session(token: str): 

491 channel = FakeChannel(token) 

492 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

493 yield events_pb2_grpc.EventsStub(channel) 

494 

495 

496@contextmanager 

497def postal_verification_session(token: str): 

498 channel = FakeChannel(token) 

499 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

500 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

501 

502 

503@contextmanager 

504def bugs_session(token: str | None = None): 

505 channel = FakeChannel(token) 

506 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

507 yield bugs_pb2_grpc.BugsStub(channel) 

508 

509 

510@contextmanager 

511def resources_session(): 

512 channel = FakeChannel() 

513 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

514 yield resources_pb2_grpc.ResourcesStub(channel)