Coverage for app / backend / src / tests / fixtures / sessions.py: 99%

279 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1from collections.abc import Generator 

2from concurrent import futures 

3from contextlib import contextmanager 

4from typing import Any, NoReturn 

5 

6import grpc 

7from grpc._server import _validate_generic_rpc_handlers 

8 

9from couchers.context import make_interactive_context 

10from couchers.db import session_scope 

11from couchers.descriptor_pool import get_descriptor_pool 

12from couchers.interceptors import ( 

13 CouchersMiddlewareInterceptor, 

14 _try_get_and_update_user_details, 

15 check_permissions, 

16 find_auth_level, 

17) 

18from couchers.proto import ( 

19 account_pb2_grpc, 

20 admin_pb2_grpc, 

21 api_pb2_grpc, 

22 auth_pb2_grpc, 

23 blocking_pb2_grpc, 

24 bugs_pb2_grpc, 

25 communities_pb2_grpc, 

26 conversations_pb2_grpc, 

27 discussions_pb2_grpc, 

28 donations_pb2_grpc, 

29 editor_pb2_grpc, 

30 events_pb2_grpc, 

31 galleries_pb2_grpc, 

32 gis_pb2_grpc, 

33 groups_pb2_grpc, 

34 iris_pb2_grpc, 

35 jail_pb2_grpc, 

36 media_pb2_grpc, 

37 moderation_pb2_grpc, 

38 notifications_pb2_grpc, 

39 pages_pb2_grpc, 

40 postal_verification_pb2_grpc, 

41 public_pb2_grpc, 

42 references_pb2_grpc, 

43 reporting_pb2_grpc, 

44 requests_pb2_grpc, 

45 resources_pb2_grpc, 

46 search_pb2_grpc, 

47 stripe_pb2_grpc, 

48 threads_pb2_grpc, 

49) 

50from couchers.servicers.account import Account, Iris 

51from couchers.servicers.admin import Admin 

52from couchers.servicers.api import API 

53from couchers.servicers.auth import Auth 

54from couchers.servicers.blocking import Blocking 

55from couchers.servicers.bugs import Bugs 

56from couchers.servicers.communities import Communities 

57from couchers.servicers.conversations import Conversations 

58from couchers.servicers.discussions import Discussions 

59from couchers.servicers.donations import Donations, Stripe 

60from couchers.servicers.editor import Editor 

61from couchers.servicers.events import Events 

62from couchers.servicers.galleries import Galleries 

63from couchers.servicers.gis import GIS 

64from couchers.servicers.groups import Groups 

65from couchers.servicers.jail import Jail 

66from couchers.servicers.media import Media, get_media_auth_interceptor 

67from couchers.servicers.moderation import Moderation 

68from couchers.servicers.notifications import Notifications 

69from couchers.servicers.pages import Pages 

70from couchers.servicers.postal_verification import PostalVerification 

71from couchers.servicers.public import Public 

72from couchers.servicers.references import References 

73from couchers.servicers.reporting import Reporting 

74from couchers.servicers.requests import Requests 

75from couchers.servicers.resources import Resources 

76from couchers.servicers.search import Search 

77from couchers.servicers.threads import Threads 

78 

79 

80class _MockCouchersContext: 

81 @property 

82 def headers(self): 

83 return {} 

84 

85 

86class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

87 """ 

88 Injects the right `cookie: couchers-sesh=...` header into the metadata 

89 """ 

90 

91 def __init__(self, token: str): 

92 self.token = token 

93 

94 def __call__(self, context, callback) -> None: 

95 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

96 

97 

98class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

99 def __init__(self): 

100 self.latest_headers = {} 

101 

102 def intercept_unary_unary(self, continuation, client_call_details, request): 

103 call = continuation(client_call_details, request) 

104 self.latest_headers = dict(call.initial_metadata()) 

105 self.latest_header_raw = call.initial_metadata() 

106 return call 

107 

108 

109class FakeRpcError(grpc.RpcError): 

110 def __init__(self, code: grpc.StatusCode, details: str): 

111 self._code = code 

112 self._details = details 

113 

114 def code(self) -> grpc.StatusCode: 

115 return self._code 

116 

117 def details(self) -> str: 

118 return self._details 

119 

120 

121class MockGrpcContext: 

122 """ 

123 Pure mock of grpc.ServicerContext for testing. 

124 """ 

125 

126 def __init__(self): 

127 self._initial_metadata = [] 

128 self._invocation_metadata: list[tuple[str, str]] = [] 

129 

130 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: 

131 raise FakeRpcError(code, details) 

132 

133 def invocation_metadata(self) -> list[tuple[str, str]]: 

134 return self._invocation_metadata 

135 

136 def send_initial_metadata(self, metadata): 

137 self._initial_metadata.extend(metadata) 

138 

139 

140class FakeChannel: 

141 """ 

142 Mock gRPC channel for testing that orchestrates context creation. 

143 

144 This holds the test state (token) and creates proper CouchersContext 

145 instances when handlers are invoked. 

146 """ 

147 

148 def __init__(self, token: str | None = None): 

149 self.handlers: dict[str, Any] = {} 

150 self._token = token 

151 self._pool = get_descriptor_pool() 

152 

153 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any): 

154 _validate_generic_rpc_handlers(generic_rpc_handlers) 

155 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

156 

157 def unary_unary(self, method, request_serializer, response_deserializer): 

158 handler = self.handlers[method] 

159 

160 def fake_handler(request): 

161 auth_info = _try_get_and_update_user_details( 

162 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

163 ) 

164 auth_level = find_auth_level(self._pool, method) 

165 check_permissions(auth_info, auth_level) 

166 

167 # Do a full serialization cycle on the request and the 

168 # response to catch accidental use of unserializable data. 

169 request = handler.request_deserializer(request_serializer(request)) 

170 

171 with session_scope() as session: 

172 context = make_interactive_context( 

173 grpc_context=MockGrpcContext(), 

174 user_id=auth_info.user_id if auth_info else None, 

175 is_api_key=False, 

176 token=self._token if auth_info else None, 

177 ui_language_preference=auth_info.ui_language_preference if auth_info else None, 

178 ) 

179 

180 response = handler.unary_unary(request, context, session) 

181 

182 return response_deserializer(handler.response_serializer(response)) 

183 

184 return fake_handler 

185 

186 

187@contextmanager 

188def run_server(grpc_channel_options=(), token: str | None = None): 

189 with futures.ThreadPoolExecutor(1) as executor: 

190 if token: 

191 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

192 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

193 else: 

194 creds = grpc.local_channel_credentials() 

195 

196 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

197 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials()) 

198 srv.start() 

199 

200 try: 

201 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel: 

202 metadata_interceptor = MetadataKeeperInterceptor() 

203 channel = grpc.intercept_channel(channel, metadata_interceptor) 

204 yield srv, channel, metadata_interceptor 

205 finally: 

206 srv.stop(None).wait() 

207 

208 

209# Sessions that start a real GRPC server. 

210@contextmanager 

211def auth_api_session( 

212 grpc_channel_options=(), 

213) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]: 

214 """ 

215 Create an Auth API for testing 

216 

217 This needs to use the real server since it plays around with headers 

218 """ 

219 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor): 

220 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

221 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

222 

223 

224@contextmanager 

225def real_api_session(token: str): 

226 """ 

227 Create an API for testing, using TCP sockets, uses the token for auth 

228 """ 

229 with run_server(token=token) as (server, channel, metadata_interceptor): 

230 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

231 yield api_pb2_grpc.APIStub(channel) 

232 

233 

234@contextmanager 

235def real_admin_session(token: str): 

236 """ 

237 Create an Admin service for testing, using TCP sockets, uses the token for auth 

238 """ 

239 with run_server(token=token) as (server, channel, metadata_interceptor): 

240 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

241 yield admin_pb2_grpc.AdminStub(channel) 

242 

243 

244@contextmanager 

245def real_editor_session(token: str): 

246 """ 

247 Create an Editor service for testing, using TCP sockets, uses the token for auth 

248 """ 

249 with run_server(token=token) as (server, channel, metadata_interceptor): 

250 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

251 yield editor_pb2_grpc.EditorStub(channel) 

252 

253 

254@contextmanager 

255def real_moderation_session(token: str): 

256 """ 

257 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

258 """ 

259 with run_server(token=token) as (server, channel, metadata_interceptor): 

260 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

261 yield moderation_pb2_grpc.ModerationStub(channel) 

262 

263 

264@contextmanager 

265def real_account_session(token: str): 

266 """ 

267 Create an Account service for testing, using TCP sockets, uses the token for auth 

268 """ 

269 with run_server(token=token) as (server, channel, metadata_interceptor): 

270 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

271 yield account_pb2_grpc.AccountStub(channel) 

272 

273 

274@contextmanager 

275def real_jail_session(token: str): 

276 """ 

277 Create a Jail service for testing, using TCP sockets, uses the token for auth 

278 """ 

279 with run_server(token=token) as (server, channel, metadata_interceptor): 

280 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

281 yield jail_pb2_grpc.JailStub(channel) 

282 

283 

284@contextmanager 

285def real_stripe_session(): 

286 """ 

287 Create a Stripe service for testing, using TCP sockets 

288 """ 

289 with run_server() as (server, channel, metadata_interceptor): 

290 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

291 yield stripe_pb2_grpc.StripeStub(channel) 

292 

293 

294@contextmanager 

295def real_iris_session(): 

296 with run_server() as (server, channel, metadata_interceptor): 

297 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

298 yield iris_pb2_grpc.IrisStub(channel) 

299 

300 

301@contextmanager 

302def media_session(bearer_token: str): 

303 """ 

304 Create a fresh Media API for testing, uses the bearer token for media auth 

305 """ 

306 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

307 

308 with futures.ThreadPoolExecutor(1) as executor: 

309 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

310 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

311 media_pb2_grpc.add_MediaServicer_to_server(Media(), server) 

312 server.start() 

313 

314 call_creds = grpc.access_token_call_credentials(bearer_token) 

315 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

316 

317 try: 

318 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

319 yield media_pb2_grpc.MediaStub(channel) 

320 finally: 

321 server.stop(None).wait() 

322 

323 

324# Sessions that don't need to start a real GRPC server. 

325# Note: these don't need to be context managers, but they are so that 

326# we can switch to a real implementation if needed. 

327@contextmanager 

328def api_session(token: str): 

329 """ 

330 Create an API for testing, uses the token for auth 

331 """ 

332 channel = FakeChannel(token) 

333 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

334 yield api_pb2_grpc.APIStub(channel) 

335 

336 

337@contextmanager 

338def gis_session(token: str): 

339 channel = FakeChannel(token) 

340 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

341 yield gis_pb2_grpc.GISStub(channel) 

342 

343 

344@contextmanager 

345def public_session(): 

346 channel = FakeChannel() 

347 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

348 yield public_pb2_grpc.PublicStub(channel) 

349 

350 

351@contextmanager 

352def conversations_session(token: str): 

353 """ 

354 Create a Conversations API for testing, uses the token for auth 

355 """ 

356 channel = FakeChannel(token) 

357 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

358 yield conversations_pb2_grpc.ConversationsStub(channel) 

359 

360 

361@contextmanager 

362def requests_session(token: str): 

363 """ 

364 Create a Requests API for testing, uses the token for auth 

365 """ 

366 channel = FakeChannel(token) 

367 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

368 yield requests_pb2_grpc.RequestsStub(channel) 

369 

370 

371@contextmanager 

372def threads_session(token: str): 

373 channel = FakeChannel(token) 

374 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

375 yield threads_pb2_grpc.ThreadsStub(channel) 

376 

377 

378@contextmanager 

379def discussions_session(token: str): 

380 channel = FakeChannel(token) 

381 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

382 yield discussions_pb2_grpc.DiscussionsStub(channel) 

383 

384 

385@contextmanager 

386def donations_session(token: str): 

387 channel = FakeChannel(token) 

388 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

389 yield donations_pb2_grpc.DonationsStub(channel) 

390 

391 

392@contextmanager 

393def pages_session(token: str): 

394 channel = FakeChannel(token) 

395 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

396 yield pages_pb2_grpc.PagesStub(channel) 

397 

398 

399@contextmanager 

400def communities_session(token: str): 

401 channel = FakeChannel(token) 

402 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

403 yield communities_pb2_grpc.CommunitiesStub(channel) 

404 

405 

406@contextmanager 

407def groups_session(token: str): 

408 channel = FakeChannel(token) 

409 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

410 yield groups_pb2_grpc.GroupsStub(channel) 

411 

412 

413@contextmanager 

414def blocking_session(token: str): 

415 channel = FakeChannel(token) 

416 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

417 yield blocking_pb2_grpc.BlockingStub(channel) 

418 

419 

420@contextmanager 

421def notifications_session(token: str): 

422 channel = FakeChannel(token) 

423 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

424 yield notifications_pb2_grpc.NotificationsStub(channel) 

425 

426 

427@contextmanager 

428def account_session(token: str): 

429 """ 

430 Create a Account API for testing, uses the token for auth 

431 """ 

432 channel = FakeChannel(token) 

433 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

434 yield account_pb2_grpc.AccountStub(channel) 

435 

436 

437@contextmanager 

438def search_session(token: str): 

439 """ 

440 Create a Search API for testing, uses the token for auth 

441 """ 

442 channel = FakeChannel(token) 

443 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

444 yield search_pb2_grpc.SearchStub(channel) 

445 

446 

447@contextmanager 

448def references_session(token: str): 

449 """ 

450 Create a References API for testing, uses the token for auth 

451 """ 

452 channel = FakeChannel(token) 

453 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

454 yield references_pb2_grpc.ReferencesStub(channel) 

455 

456 

457@contextmanager 

458def galleries_session(token: str): 

459 """ 

460 Create a Galleries API for testing, uses the token for auth 

461 """ 

462 channel = FakeChannel(token) 

463 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

464 yield galleries_pb2_grpc.GalleriesStub(channel) 

465 

466 

467@contextmanager 

468def reporting_session(token: str): 

469 channel = FakeChannel(token) 

470 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

471 yield reporting_pb2_grpc.ReportingStub(channel) 

472 

473 

474@contextmanager 

475def events_session(token: str): 

476 channel = FakeChannel(token) 

477 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

478 yield events_pb2_grpc.EventsStub(channel) 

479 

480 

481@contextmanager 

482def postal_verification_session(token: str): 

483 channel = FakeChannel(token) 

484 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

485 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

486 

487 

488@contextmanager 

489def bugs_session(token: str | None = None): 

490 channel = FakeChannel(token) 

491 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

492 yield bugs_pb2_grpc.BugsStub(channel) 

493 

494 

495@contextmanager 

496def resources_session(): 

497 channel = FakeChannel() 

498 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

499 yield resources_pb2_grpc.ResourcesStub(channel)