Coverage for app / backend / src / tests / fixtures / sessions.py: 99%

282 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1from collections.abc import Generator 

2from concurrent import futures 

3from contextlib import contextmanager 

4from typing import Any, NoReturn 

5from zoneinfo import ZoneInfo 

6 

7import grpc 

8from grpc._server import _validate_generic_rpc_handlers 

9 

10from couchers.context import make_interactive_context 

11from couchers.db import session_scope 

12from couchers.descriptor_pool import get_descriptor_pool 

13from couchers.i18n import LocalizationContext 

14from couchers.i18n.locales import DEFAULT_LOCALE 

15from couchers.interceptors import ( 

16 CouchersMiddlewareInterceptor, 

17 _try_get_and_update_user_details, 

18 check_permissions, 

19 find_auth_level, 

20) 

21from couchers.proto import ( 

22 account_pb2_grpc, 

23 admin_pb2_grpc, 

24 api_pb2_grpc, 

25 auth_pb2_grpc, 

26 blocking_pb2_grpc, 

27 bugs_pb2_grpc, 

28 communities_pb2_grpc, 

29 conversations_pb2_grpc, 

30 discussions_pb2_grpc, 

31 donations_pb2_grpc, 

32 editor_pb2_grpc, 

33 events_pb2_grpc, 

34 galleries_pb2_grpc, 

35 gis_pb2_grpc, 

36 groups_pb2_grpc, 

37 iris_pb2_grpc, 

38 jail_pb2_grpc, 

39 media_pb2_grpc, 

40 moderation_pb2_grpc, 

41 notifications_pb2_grpc, 

42 pages_pb2_grpc, 

43 postal_verification_pb2_grpc, 

44 public_pb2_grpc, 

45 references_pb2_grpc, 

46 reporting_pb2_grpc, 

47 requests_pb2_grpc, 

48 resources_pb2_grpc, 

49 search_pb2_grpc, 

50 stripe_pb2_grpc, 

51 threads_pb2_grpc, 

52) 

53from couchers.servicers.account import Account, Iris 

54from couchers.servicers.admin import Admin 

55from couchers.servicers.api import API 

56from couchers.servicers.auth import Auth 

57from couchers.servicers.blocking import Blocking 

58from couchers.servicers.bugs import Bugs 

59from couchers.servicers.communities import Communities 

60from couchers.servicers.conversations import Conversations 

61from couchers.servicers.discussions import Discussions 

62from couchers.servicers.donations import Donations, Stripe 

63from couchers.servicers.editor import Editor 

64from couchers.servicers.events import Events 

65from couchers.servicers.galleries import Galleries 

66from couchers.servicers.gis import GIS 

67from couchers.servicers.groups import Groups 

68from couchers.servicers.jail import Jail 

69from couchers.servicers.media import Media, get_media_auth_interceptor 

70from couchers.servicers.moderation import Moderation 

71from couchers.servicers.notifications import Notifications 

72from couchers.servicers.pages import Pages 

73from couchers.servicers.postal_verification import PostalVerification 

74from couchers.servicers.public import Public 

75from couchers.servicers.references import References 

76from couchers.servicers.reporting import Reporting 

77from couchers.servicers.requests import Requests 

78from couchers.servicers.resources import Resources 

79from couchers.servicers.search import Search 

80from couchers.servicers.threads import Threads 

81 

82 

83class _MockCouchersContext: 

84 @property 

85 def headers(self): 

86 return {} 

87 

88 

89class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

90 """ 

91 Injects the right `cookie: couchers-sesh=...` header into the metadata 

92 """ 

93 

94 def __init__(self, token: str): 

95 self.token = token 

96 

97 def __call__(self, context, callback) -> None: 

98 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

99 

100 

101class MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

102 def __init__(self): 

103 self.latest_headers = {} 

104 

105 def intercept_unary_unary(self, continuation, client_call_details, request): 

106 call = continuation(client_call_details, request) 

107 self.latest_headers = dict(call.initial_metadata()) 

108 self.latest_header_raw = call.initial_metadata() 

109 return call 

110 

111 

112class FakeRpcError(grpc.RpcError): 

113 def __init__(self, code: grpc.StatusCode, details: str): 

114 self._code = code 

115 self._details = details 

116 

117 def code(self) -> grpc.StatusCode: 

118 return self._code 

119 

120 def details(self) -> str: 

121 return self._details 

122 

123 

124class MockGrpcContext: 

125 """ 

126 Pure mock of grpc.ServicerContext for testing. 

127 """ 

128 

129 def __init__(self): 

130 self._initial_metadata = [] 

131 self._invocation_metadata: list[tuple[str, str]] = [] 

132 

133 def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: 

134 raise FakeRpcError(code, details) 

135 

136 def invocation_metadata(self) -> list[tuple[str, str]]: 

137 return self._invocation_metadata 

138 

139 def send_initial_metadata(self, metadata): 

140 self._initial_metadata.extend(metadata) 

141 

142 

143class FakeChannel: 

144 """ 

145 Mock gRPC channel for testing that orchestrates context creation. 

146 

147 This holds the test state (token) and creates proper CouchersContext 

148 instances when handlers are invoked. 

149 """ 

150 

151 def __init__(self, token: str | None = None): 

152 self.handlers: dict[str, Any] = {} 

153 self._token = token 

154 self._pool = get_descriptor_pool() 

155 

156 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any): 

157 _validate_generic_rpc_handlers(generic_rpc_handlers) 

158 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

159 

160 def unary_unary(self, method, request_serializer, response_deserializer): 

161 handler = self.handlers[method] 

162 

163 def fake_handler(request): 

164 auth_info = _try_get_and_update_user_details( 

165 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

166 ) 

167 auth_level = find_auth_level(self._pool, method) 

168 check_permissions(auth_info, auth_level) 

169 

170 # Do a full serialization cycle on the request and the 

171 # response to catch accidental use of unserializable data. 

172 request = handler.request_deserializer(request_serializer(request)) 

173 

174 with session_scope() as session: 

175 context = make_interactive_context( 

176 grpc_context=MockGrpcContext(), 

177 user_id=auth_info.user_id if auth_info else None, 

178 is_api_key=False, 

179 token=self._token if auth_info else None, 

180 localization=LocalizationContext( 

181 locale=(auth_info and auth_info.ui_language_preference) or DEFAULT_LOCALE, 

182 timezone=ZoneInfo((auth_info and auth_info.timezone) or "Etc/UTC"), 

183 ), 

184 ) 

185 

186 response = handler.unary_unary(request, context, session) 

187 

188 return response_deserializer(handler.response_serializer(response)) 

189 

190 return fake_handler 

191 

192 

193@contextmanager 

194def run_server(grpc_channel_options=(), token: str | None = None): 

195 with futures.ThreadPoolExecutor(1) as executor: 

196 if token: 

197 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

198 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

199 else: 

200 creds = grpc.local_channel_credentials() 

201 

202 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

203 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials()) 

204 srv.start() 

205 

206 try: 

207 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel: 

208 metadata_interceptor = MetadataKeeperInterceptor() 

209 channel = grpc.intercept_channel(channel, metadata_interceptor) 

210 yield srv, channel, metadata_interceptor 

211 finally: 

212 srv.stop(None).wait() 

213 

214 

215# Sessions that start a real GRPC server. 

216@contextmanager 

217def auth_api_session( 

218 grpc_channel_options=(), 

219) -> Generator[tuple[auth_pb2_grpc.AuthStub, MetadataKeeperInterceptor]]: 

220 """ 

221 Create an Auth API for testing 

222 

223 This needs to use the real server since it plays around with headers 

224 """ 

225 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor): 

226 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

227 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

228 

229 

230@contextmanager 

231def real_api_session(token: str): 

232 """ 

233 Create an API for testing, using TCP sockets, uses the token for auth 

234 """ 

235 with run_server(token=token) as (server, channel, metadata_interceptor): 

236 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

237 yield api_pb2_grpc.APIStub(channel) 

238 

239 

240@contextmanager 

241def real_admin_session(token: str): 

242 """ 

243 Create an Admin service for testing, using TCP sockets, uses the token for auth 

244 """ 

245 with run_server(token=token) as (server, channel, metadata_interceptor): 

246 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

247 yield admin_pb2_grpc.AdminStub(channel) 

248 

249 

250@contextmanager 

251def real_editor_session(token: str): 

252 """ 

253 Create an Editor service for testing, using TCP sockets, uses the token for auth 

254 """ 

255 with run_server(token=token) as (server, channel, metadata_interceptor): 

256 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

257 yield editor_pb2_grpc.EditorStub(channel) 

258 

259 

260@contextmanager 

261def real_moderation_session(token: str): 

262 """ 

263 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

264 """ 

265 with run_server(token=token) as (server, channel, metadata_interceptor): 

266 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

267 yield moderation_pb2_grpc.ModerationStub(channel) 

268 

269 

270@contextmanager 

271def real_account_session(token: str): 

272 """ 

273 Create an Account service for testing, using TCP sockets, uses the token for auth 

274 """ 

275 with run_server(token=token) as (server, channel, metadata_interceptor): 

276 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

277 yield account_pb2_grpc.AccountStub(channel) 

278 

279 

280@contextmanager 

281def real_jail_session(token: str): 

282 """ 

283 Create a Jail service for testing, using TCP sockets, uses the token for auth 

284 """ 

285 with run_server(token=token) as (server, channel, metadata_interceptor): 

286 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

287 yield jail_pb2_grpc.JailStub(channel) 

288 

289 

290@contextmanager 

291def real_stripe_session(): 

292 """ 

293 Create a Stripe service for testing, using TCP sockets 

294 """ 

295 with run_server() as (server, channel, metadata_interceptor): 

296 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

297 yield stripe_pb2_grpc.StripeStub(channel) 

298 

299 

300@contextmanager 

301def real_iris_session(): 

302 with run_server() as (server, channel, metadata_interceptor): 

303 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

304 yield iris_pb2_grpc.IrisStub(channel) 

305 

306 

307@contextmanager 

308def media_session(bearer_token: str): 

309 """ 

310 Create a fresh Media API for testing, uses the bearer token for media auth 

311 """ 

312 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

313 

314 with futures.ThreadPoolExecutor(1) as executor: 

315 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

316 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

317 media_pb2_grpc.add_MediaServicer_to_server(Media(), server) 

318 server.start() 

319 

320 call_creds = grpc.access_token_call_credentials(bearer_token) 

321 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

322 

323 try: 

324 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

325 yield media_pb2_grpc.MediaStub(channel) 

326 finally: 

327 server.stop(None).wait() 

328 

329 

330# Sessions that don't need to start a real GRPC server. 

331# Note: these don't need to be context managers, but they are so that 

332# we can switch to a real implementation if needed. 

333@contextmanager 

334def api_session(token: str): 

335 """ 

336 Create an API for testing, uses the token for auth 

337 """ 

338 channel = FakeChannel(token) 

339 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

340 yield api_pb2_grpc.APIStub(channel) 

341 

342 

343@contextmanager 

344def gis_session(token: str): 

345 channel = FakeChannel(token) 

346 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

347 yield gis_pb2_grpc.GISStub(channel) 

348 

349 

350@contextmanager 

351def public_session(): 

352 channel = FakeChannel() 

353 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

354 yield public_pb2_grpc.PublicStub(channel) 

355 

356 

357@contextmanager 

358def conversations_session(token: str): 

359 """ 

360 Create a Conversations API for testing, uses the token for auth 

361 """ 

362 channel = FakeChannel(token) 

363 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

364 yield conversations_pb2_grpc.ConversationsStub(channel) 

365 

366 

367@contextmanager 

368def requests_session(token: str): 

369 """ 

370 Create a Requests API for testing, uses the token for auth 

371 """ 

372 channel = FakeChannel(token) 

373 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

374 yield requests_pb2_grpc.RequestsStub(channel) 

375 

376 

377@contextmanager 

378def threads_session(token: str): 

379 channel = FakeChannel(token) 

380 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

381 yield threads_pb2_grpc.ThreadsStub(channel) 

382 

383 

384@contextmanager 

385def discussions_session(token: str): 

386 channel = FakeChannel(token) 

387 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

388 yield discussions_pb2_grpc.DiscussionsStub(channel) 

389 

390 

391@contextmanager 

392def donations_session(token: str): 

393 channel = FakeChannel(token) 

394 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

395 yield donations_pb2_grpc.DonationsStub(channel) 

396 

397 

398@contextmanager 

399def pages_session(token: str): 

400 channel = FakeChannel(token) 

401 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

402 yield pages_pb2_grpc.PagesStub(channel) 

403 

404 

405@contextmanager 

406def communities_session(token: str): 

407 channel = FakeChannel(token) 

408 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

409 yield communities_pb2_grpc.CommunitiesStub(channel) 

410 

411 

412@contextmanager 

413def groups_session(token: str): 

414 channel = FakeChannel(token) 

415 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

416 yield groups_pb2_grpc.GroupsStub(channel) 

417 

418 

419@contextmanager 

420def blocking_session(token: str): 

421 channel = FakeChannel(token) 

422 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

423 yield blocking_pb2_grpc.BlockingStub(channel) 

424 

425 

426@contextmanager 

427def notifications_session(token: str): 

428 channel = FakeChannel(token) 

429 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

430 yield notifications_pb2_grpc.NotificationsStub(channel) 

431 

432 

433@contextmanager 

434def account_session(token: str): 

435 """ 

436 Create a Account API for testing, uses the token for auth 

437 """ 

438 channel = FakeChannel(token) 

439 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

440 yield account_pb2_grpc.AccountStub(channel) 

441 

442 

443@contextmanager 

444def search_session(token: str): 

445 """ 

446 Create a Search API for testing, uses the token for auth 

447 """ 

448 channel = FakeChannel(token) 

449 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

450 yield search_pb2_grpc.SearchStub(channel) 

451 

452 

453@contextmanager 

454def references_session(token: str): 

455 """ 

456 Create a References API for testing, uses the token for auth 

457 """ 

458 channel = FakeChannel(token) 

459 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

460 yield references_pb2_grpc.ReferencesStub(channel) 

461 

462 

463@contextmanager 

464def galleries_session(token: str): 

465 """ 

466 Create a Galleries API for testing, uses the token for auth 

467 """ 

468 channel = FakeChannel(token) 

469 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

470 yield galleries_pb2_grpc.GalleriesStub(channel) 

471 

472 

473@contextmanager 

474def reporting_session(token: str): 

475 channel = FakeChannel(token) 

476 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

477 yield reporting_pb2_grpc.ReportingStub(channel) 

478 

479 

480@contextmanager 

481def events_session(token: str): 

482 channel = FakeChannel(token) 

483 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

484 yield events_pb2_grpc.EventsStub(channel) 

485 

486 

487@contextmanager 

488def postal_verification_session(token: str): 

489 channel = FakeChannel(token) 

490 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

491 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

492 

493 

494@contextmanager 

495def bugs_session(token: str | None = None): 

496 channel = FakeChannel(token) 

497 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

498 yield bugs_pb2_grpc.BugsStub(channel) 

499 

500 

501@contextmanager 

502def resources_session(): 

503 channel = FakeChannel() 

504 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

505 yield resources_pb2_grpc.ResourcesStub(channel)