Coverage for src / tests / fixtures / sessions.py: 99%

289 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 16:21 +0000

1from collections.abc import Generator 

2from concurrent import futures 

3from contextlib import contextmanager 

4from typing import Any 

5 

6import grpc 

7from grpc._server import _validate_generic_rpc_handlers 

8 

9from couchers.context import make_interactive_context 

10from couchers.db import session_scope 

11from couchers.descriptor_pool import get_descriptor_pool 

12from couchers.interceptors import CouchersMiddlewareInterceptor, UserAuthInfo, _try_get_and_update_user_details 

13from couchers.proto import ( 

14 account_pb2_grpc, 

15 admin_pb2_grpc, 

16 annotations_pb2, 

17 api_pb2_grpc, 

18 auth_pb2_grpc, 

19 blocking_pb2_grpc, 

20 bugs_pb2_grpc, 

21 communities_pb2_grpc, 

22 conversations_pb2_grpc, 

23 discussions_pb2_grpc, 

24 donations_pb2_grpc, 

25 editor_pb2_grpc, 

26 events_pb2_grpc, 

27 galleries_pb2_grpc, 

28 gis_pb2_grpc, 

29 groups_pb2_grpc, 

30 iris_pb2_grpc, 

31 jail_pb2_grpc, 

32 media_pb2_grpc, 

33 moderation_pb2_grpc, 

34 notifications_pb2_grpc, 

35 pages_pb2_grpc, 

36 postal_verification_pb2_grpc, 

37 public_pb2_grpc, 

38 references_pb2_grpc, 

39 reporting_pb2_grpc, 

40 requests_pb2_grpc, 

41 resources_pb2_grpc, 

42 search_pb2_grpc, 

43 stripe_pb2_grpc, 

44 threads_pb2_grpc, 

45) 

46from couchers.servicers.account import Account, Iris 

47from couchers.servicers.admin import Admin 

48from couchers.servicers.api import API 

49from couchers.servicers.auth import Auth 

50from couchers.servicers.blocking import Blocking 

51from couchers.servicers.bugs import Bugs 

52from couchers.servicers.communities import Communities 

53from couchers.servicers.conversations import Conversations 

54from couchers.servicers.discussions import Discussions 

55from couchers.servicers.donations import Donations, Stripe 

56from couchers.servicers.editor import Editor 

57from couchers.servicers.events import Events 

58from couchers.servicers.galleries import Galleries 

59from couchers.servicers.gis import GIS 

60from couchers.servicers.groups import Groups 

61from couchers.servicers.jail import Jail 

62from couchers.servicers.media import Media, get_media_auth_interceptor 

63from couchers.servicers.moderation import Moderation 

64from couchers.servicers.notifications import Notifications 

65from couchers.servicers.pages import Pages 

66from couchers.servicers.postal_verification import PostalVerification 

67from couchers.servicers.public import Public 

68from couchers.servicers.references import References 

69from couchers.servicers.reporting import Reporting 

70from couchers.servicers.requests import Requests 

71from couchers.servicers.resources import Resources 

72from couchers.servicers.search import Search 

73from couchers.servicers.threads import Threads 

74 

75 

76class _MockCouchersContext: 

77 @property 

78 def headers(self): 

79 return {} 

80 

81 

82class CookieMetadataPlugin(grpc.AuthMetadataPlugin): 

83 """ 

84 Injects the right `cookie: couchers-sesh=...` header into the metadata 

85 """ 

86 

87 def __init__(self, token: str): 

88 self.token = token 

89 

90 def __call__(self, context, callback) -> None: 

91 callback((("cookie", f"couchers-sesh={self.token}"),), None) 

92 

93 

94class _MetadataKeeperInterceptor(grpc.UnaryUnaryClientInterceptor): 

95 def __init__(self): 

96 self.latest_headers = {} 

97 

98 def intercept_unary_unary(self, continuation, client_call_details, request): 

99 call = continuation(client_call_details, request) 

100 self.latest_headers = dict(call.initial_metadata()) 

101 self.latest_header_raw = call.initial_metadata() 

102 return call 

103 

104 

105class FakeRpcError(grpc.RpcError): 

106 def __init__(self, code, details): 

107 self._code = code 

108 self._details = details 

109 

110 def code(self): 

111 return self._code 

112 

113 def details(self): 

114 return self._details 

115 

116 

117def _check_user_perms(method: str, auth_info: UserAuthInfo | None) -> None: 

118 # method is of the form "/org.couchers.api.core.API/GetUser" 

119 _, service_name, method_name = method.split("/") 

120 

121 service_options = get_descriptor_pool().FindServiceByName(service_name).GetOptions() 

122 auth_level = service_options.Extensions[annotations_pb2.auth_level] 

123 assert auth_level != annotations_pb2.AUTH_LEVEL_UNKNOWN 

124 assert auth_level in [ 

125 annotations_pb2.AUTH_LEVEL_OPEN, 

126 annotations_pb2.AUTH_LEVEL_JAILED, 

127 annotations_pb2.AUTH_LEVEL_SECURE, 

128 annotations_pb2.AUTH_LEVEL_EDITOR, 

129 annotations_pb2.AUTH_LEVEL_ADMIN, 

130 ] 

131 

132 if not auth_info: 

133 assert auth_level == annotations_pb2.AUTH_LEVEL_OPEN 

134 else: 

135 assert not (auth_level == annotations_pb2.AUTH_LEVEL_ADMIN and not auth_info.is_superuser), ( 

136 "Non-superuser tried to call superuser API" 

137 ) 

138 assert not (auth_level == annotations_pb2.AUTH_LEVEL_EDITOR and not auth_info.is_editor), ( 

139 "Non-editor tried to call editor API" 

140 ) 

141 assert not ( 

142 auth_info.is_jailed 

143 and auth_level not in [annotations_pb2.AUTH_LEVEL_OPEN, annotations_pb2.AUTH_LEVEL_JAILED] 

144 ), "User is jailed but tried to call non-open/non-jailed API" 

145 

146 

147class MockGrpcContext: 

148 """ 

149 Pure mock of grpc.ServicerContext for testing. 

150 """ 

151 

152 def __init__(self): 

153 self._initial_metadata = [] 

154 self._invocation_metadata = [] 

155 

156 def abort(self, code, details): 

157 raise FakeRpcError(code, details) 

158 

159 def invocation_metadata(self): 

160 return self._invocation_metadata 

161 

162 def send_initial_metadata(self, metadata): 

163 self._initial_metadata.extend(metadata) 

164 

165 

166class FakeChannel: 

167 """ 

168 Mock gRPC channel for testing that orchestrates context creation. 

169 

170 This holds test state (token) and creates proper CouchersContext 

171 instances when handlers are invoked. 

172 """ 

173 

174 def __init__(self, token: str | None = None): 

175 self.handlers: dict[str, Any] = {} 

176 self._token = token 

177 

178 def add_generic_rpc_handlers(self, generic_rpc_handlers: Any): 

179 _validate_generic_rpc_handlers(generic_rpc_handlers) 

180 self.handlers.update(generic_rpc_handlers[0]._method_handlers) 

181 

182 def unary_unary(self, uri, request_serializer, response_deserializer): 

183 handler = self.handlers[uri] 

184 

185 def fake_handler(request): 

186 auth_info = _try_get_and_update_user_details( 

187 self._token, is_api_key=False, ip_address="127.0.0.1", user_agent="Testing User-Agent" 

188 ) 

189 

190 _check_user_perms(uri, auth_info) 

191 

192 # Do a full serialization cycle on the request and the 

193 # response to catch accidental use of unserializable data. 

194 request = handler.request_deserializer(request_serializer(request)) 

195 

196 with session_scope() as session: 

197 mock_grpc_ctx = MockGrpcContext() 

198 

199 context = make_interactive_context( 

200 grpc_context=mock_grpc_ctx, 

201 user_id=auth_info.user_id if auth_info else None, 

202 is_api_key=False, 

203 token=self._token if auth_info else None, 

204 ui_language_preference=auth_info.ui_language_preference if auth_info else None, 

205 ) 

206 

207 response = handler.unary_unary(request, context, session) 

208 

209 return response_deserializer(handler.response_serializer(response)) 

210 

211 return fake_handler 

212 

213 

214@contextmanager 

215def run_server(grpc_channel_options=(), token: str | None = None): 

216 with futures.ThreadPoolExecutor(1) as executor: 

217 if token: 

218 call_creds = grpc.metadata_call_credentials(CookieMetadataPlugin(token)) 

219 creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

220 else: 

221 creds = grpc.local_channel_credentials() 

222 

223 srv = grpc.server(executor, interceptors=[CouchersMiddlewareInterceptor()]) 

224 port = srv.add_secure_port("localhost:0", grpc.local_server_credentials()) 

225 srv.start() 

226 

227 try: 

228 with grpc.secure_channel(f"localhost:{port}", creds, options=grpc_channel_options) as channel: 

229 metadata_interceptor = _MetadataKeeperInterceptor() 

230 channel = grpc.intercept_channel(channel, metadata_interceptor) 

231 yield srv, channel, metadata_interceptor 

232 finally: 

233 srv.stop(None).wait() 

234 

235 

236# Sessions that start a real GRPC server. 

237@contextmanager 

238def auth_api_session( 

239 grpc_channel_options=(), 

240) -> Generator[tuple[auth_pb2_grpc.AuthStub, grpc.UnaryUnaryClientInterceptor]]: 

241 """ 

242 Create an Auth API for testing 

243 

244 This needs to use the real server since it plays around with headers 

245 """ 

246 with run_server(grpc_channel_options) as (server, channel, metadata_interceptor): 

247 auth_pb2_grpc.add_AuthServicer_to_server(Auth(), server) 

248 yield auth_pb2_grpc.AuthStub(channel), metadata_interceptor 

249 

250 

251@contextmanager 

252def real_api_session(token: str): 

253 """ 

254 Create an API for testing, using TCP sockets, uses the token for auth 

255 """ 

256 with run_server(token=token) as (server, channel, metadata_interceptor): 

257 api_pb2_grpc.add_APIServicer_to_server(API(), server) 

258 yield api_pb2_grpc.APIStub(channel) 

259 

260 

261@contextmanager 

262def real_admin_session(token: str): 

263 """ 

264 Create an Admin service for testing, using TCP sockets, uses the token for auth 

265 """ 

266 with run_server(token=token) as (server, channel, metadata_interceptor): 

267 admin_pb2_grpc.add_AdminServicer_to_server(Admin(), server) 

268 yield admin_pb2_grpc.AdminStub(channel) 

269 

270 

271@contextmanager 

272def real_editor_session(token: str): 

273 """ 

274 Create an Editor service for testing, using TCP sockets, uses the token for auth 

275 """ 

276 with run_server(token=token) as (server, channel, metadata_interceptor): 

277 editor_pb2_grpc.add_EditorServicer_to_server(Editor(), server) 

278 yield editor_pb2_grpc.EditorStub(channel) 

279 

280 

281@contextmanager 

282def real_moderation_session(token: str): 

283 """ 

284 Create a Moderation service for testing, using TCP sockets, uses the token for auth 

285 """ 

286 with run_server(token=token) as (server, channel, metadata_interceptor): 

287 moderation_pb2_grpc.add_ModerationServicer_to_server(Moderation(), server) 

288 yield moderation_pb2_grpc.ModerationStub(channel) 

289 

290 

291@contextmanager 

292def real_account_session(token: str): 

293 """ 

294 Create an Account service for testing, using TCP sockets, uses the token for auth 

295 """ 

296 with run_server(token=token) as (server, channel, metadata_interceptor): 

297 account_pb2_grpc.add_AccountServicer_to_server(Account(), server) 

298 yield account_pb2_grpc.AccountStub(channel) 

299 

300 

301@contextmanager 

302def real_jail_session(token: str): 

303 """ 

304 Create a Jail service for testing, using TCP sockets, uses the token for auth 

305 """ 

306 with run_server(token=token) as (server, channel, metadata_interceptor): 

307 jail_pb2_grpc.add_JailServicer_to_server(Jail(), server) 

308 yield jail_pb2_grpc.JailStub(channel) 

309 

310 

311@contextmanager 

312def real_stripe_session(): 

313 """ 

314 Create a Stripe service for testing, using TCP sockets 

315 """ 

316 with run_server() as (server, channel, metadata_interceptor): 

317 stripe_pb2_grpc.add_StripeServicer_to_server(Stripe(), server) 

318 yield stripe_pb2_grpc.StripeStub(channel) 

319 

320 

321@contextmanager 

322def real_iris_session(): 

323 with run_server() as (server, channel, metadata_interceptor): 

324 iris_pb2_grpc.add_IrisServicer_to_server(Iris(), server) 

325 yield iris_pb2_grpc.IrisStub(channel) 

326 

327 

328@contextmanager 

329def media_session(bearer_token: str): 

330 """ 

331 Create a fresh Media API for testing, uses the bearer token for media auth 

332 """ 

333 media_auth_interceptor = get_media_auth_interceptor(bearer_token) 

334 

335 with futures.ThreadPoolExecutor(1) as executor: 

336 server = grpc.server(executor, interceptors=[media_auth_interceptor]) 

337 port = server.add_secure_port("localhost:0", grpc.local_server_credentials()) 

338 media_pb2_grpc.add_MediaServicer_to_server(Media(), server) 

339 server.start() 

340 

341 call_creds = grpc.access_token_call_credentials(bearer_token) 

342 comp_creds = grpc.composite_channel_credentials(grpc.local_channel_credentials(), call_creds) 

343 

344 try: 

345 with grpc.secure_channel(f"localhost:{port}", comp_creds) as channel: 

346 yield media_pb2_grpc.MediaStub(channel) 

347 finally: 

348 server.stop(None).wait() 

349 

350 

351# Sessions that don't need to start a real GRPC server. 

352# Note: these don't need to be context managers, but they are so that 

353# we can switch to a real implementation if needed. 

354@contextmanager 

355def api_session(token: str): 

356 """ 

357 Create an API for testing, uses the token for auth 

358 """ 

359 channel = FakeChannel(token) 

360 api_pb2_grpc.add_APIServicer_to_server(API(), channel) 

361 yield api_pb2_grpc.APIStub(channel) 

362 

363 

364@contextmanager 

365def gis_session(token: str): 

366 channel = FakeChannel(token) 

367 gis_pb2_grpc.add_GISServicer_to_server(GIS(), channel) 

368 yield gis_pb2_grpc.GISStub(channel) 

369 

370 

371@contextmanager 

372def public_session(): 

373 channel = FakeChannel() 

374 public_pb2_grpc.add_PublicServicer_to_server(Public(), channel) 

375 yield public_pb2_grpc.PublicStub(channel) 

376 

377 

378@contextmanager 

379def conversations_session(token: str): 

380 """ 

381 Create a Conversations API for testing, uses the token for auth 

382 """ 

383 channel = FakeChannel(token) 

384 conversations_pb2_grpc.add_ConversationsServicer_to_server(Conversations(), channel) 

385 yield conversations_pb2_grpc.ConversationsStub(channel) 

386 

387 

388@contextmanager 

389def requests_session(token: str): 

390 """ 

391 Create a Requests API for testing, uses the token for auth 

392 """ 

393 channel = FakeChannel(token) 

394 requests_pb2_grpc.add_RequestsServicer_to_server(Requests(), channel) 

395 yield requests_pb2_grpc.RequestsStub(channel) 

396 

397 

398@contextmanager 

399def threads_session(token: str): 

400 channel = FakeChannel(token) 

401 threads_pb2_grpc.add_ThreadsServicer_to_server(Threads(), channel) 

402 yield threads_pb2_grpc.ThreadsStub(channel) 

403 

404 

405@contextmanager 

406def discussions_session(token: str): 

407 channel = FakeChannel(token) 

408 discussions_pb2_grpc.add_DiscussionsServicer_to_server(Discussions(), channel) 

409 yield discussions_pb2_grpc.DiscussionsStub(channel) 

410 

411 

412@contextmanager 

413def donations_session(token: str): 

414 channel = FakeChannel(token) 

415 donations_pb2_grpc.add_DonationsServicer_to_server(Donations(), channel) 

416 yield donations_pb2_grpc.DonationsStub(channel) 

417 

418 

419@contextmanager 

420def pages_session(token: str): 

421 channel = FakeChannel(token) 

422 pages_pb2_grpc.add_PagesServicer_to_server(Pages(), channel) 

423 yield pages_pb2_grpc.PagesStub(channel) 

424 

425 

426@contextmanager 

427def communities_session(token: str): 

428 channel = FakeChannel(token) 

429 communities_pb2_grpc.add_CommunitiesServicer_to_server(Communities(), channel) 

430 yield communities_pb2_grpc.CommunitiesStub(channel) 

431 

432 

433@contextmanager 

434def groups_session(token: str): 

435 channel = FakeChannel(token) 

436 groups_pb2_grpc.add_GroupsServicer_to_server(Groups(), channel) 

437 yield groups_pb2_grpc.GroupsStub(channel) 

438 

439 

440@contextmanager 

441def blocking_session(token: str): 

442 channel = FakeChannel(token) 

443 blocking_pb2_grpc.add_BlockingServicer_to_server(Blocking(), channel) 

444 yield blocking_pb2_grpc.BlockingStub(channel) 

445 

446 

447@contextmanager 

448def notifications_session(token: str): 

449 channel = FakeChannel(token) 

450 notifications_pb2_grpc.add_NotificationsServicer_to_server(Notifications(), channel) 

451 yield notifications_pb2_grpc.NotificationsStub(channel) 

452 

453 

454@contextmanager 

455def account_session(token: str): 

456 """ 

457 Create a Account API for testing, uses the token for auth 

458 """ 

459 channel = FakeChannel(token) 

460 account_pb2_grpc.add_AccountServicer_to_server(Account(), channel) 

461 yield account_pb2_grpc.AccountStub(channel) 

462 

463 

464@contextmanager 

465def search_session(token: str): 

466 """ 

467 Create a Search API for testing, uses the token for auth 

468 """ 

469 channel = FakeChannel(token) 

470 search_pb2_grpc.add_SearchServicer_to_server(Search(), channel) 

471 yield search_pb2_grpc.SearchStub(channel) 

472 

473 

474@contextmanager 

475def references_session(token: str): 

476 """ 

477 Create a References API for testing, uses the token for auth 

478 """ 

479 channel = FakeChannel(token) 

480 references_pb2_grpc.add_ReferencesServicer_to_server(References(), channel) 

481 yield references_pb2_grpc.ReferencesStub(channel) 

482 

483 

484@contextmanager 

485def galleries_session(token: str): 

486 """ 

487 Create a Galleries API for testing, uses the token for auth 

488 """ 

489 channel = FakeChannel(token) 

490 galleries_pb2_grpc.add_GalleriesServicer_to_server(Galleries(), channel) 

491 yield galleries_pb2_grpc.GalleriesStub(channel) 

492 

493 

494@contextmanager 

495def reporting_session(token: str): 

496 channel = FakeChannel(token) 

497 reporting_pb2_grpc.add_ReportingServicer_to_server(Reporting(), channel) 

498 yield reporting_pb2_grpc.ReportingStub(channel) 

499 

500 

501@contextmanager 

502def events_session(token: str): 

503 channel = FakeChannel(token) 

504 events_pb2_grpc.add_EventsServicer_to_server(Events(), channel) 

505 yield events_pb2_grpc.EventsStub(channel) 

506 

507 

508@contextmanager 

509def postal_verification_session(token: str): 

510 channel = FakeChannel(token) 

511 postal_verification_pb2_grpc.add_PostalVerificationServicer_to_server(PostalVerification(), channel) 

512 yield postal_verification_pb2_grpc.PostalVerificationStub(channel) 

513 

514 

515@contextmanager 

516def bugs_session(token: str | None = None): 

517 channel = FakeChannel(token) 

518 bugs_pb2_grpc.add_BugsServicer_to_server(Bugs(), channel) 

519 yield bugs_pb2_grpc.BugsStub(channel) 

520 

521 

522@contextmanager 

523def resources_session(): 

524 channel = FakeChannel() 

525 resources_pb2_grpc.add_ResourcesServicer_to_server(Resources(), channel) 

526 yield resources_pb2_grpc.ResourcesStub(channel)