Coverage for app/backend/src/couchers/servicers/communities.py: 80%

210 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy import select 

8from sqlalchemy.orm import Session, selectinload 

9from sqlalchemy.sql import delete, func, or_ 

10 

11from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD 

12from couchers.context import CouchersContext 

13from couchers.crypto import decrypt_page_token, encrypt_page_token 

14from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography 

15from couchers.event_log import log_event 

16from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

17from couchers.models import ( 

18 Cluster, 

19 ClusterRole, 

20 ClusterSubscription, 

21 Comment, 

22 Discussion, 

23 Event, 

24 EventOccurrence, 

25 Node, 

26 NodeType, 

27 Page, 

28 PageType, 

29 User, 

30) 

31from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2 

32from couchers.servicers.discussions import discussion_to_pb 

33from couchers.servicers.events import event_to_pb 

34from couchers.servicers.groups import group_to_pb 

35from couchers.servicers.pages import page_to_pb 

36from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible 

37from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

38 

39logger = logging.getLogger(__name__) 

40 

41MAX_PAGINATION_LENGTH = 25 

42 

43nodetype2api = { 

44 NodeType.world: communities_pb2.NODE_TYPE_WORLD, 

45 NodeType.macroregion: communities_pb2.NODE_TYPE_MACROREGION, 

46 NodeType.region: communities_pb2.NODE_TYPE_REGION, 

47 NodeType.subregion: communities_pb2.NODE_TYPE_SUBREGION, 

48 NodeType.locality: communities_pb2.NODE_TYPE_LOCALITY, 

49 NodeType.sublocality: communities_pb2.NODE_TYPE_SUBLOCALITY, 

50} 

51 

52 

53def _parents_to_pb(session: Session, node_id: int) -> list[groups_pb2.Parent]: 

54 parents = get_node_parents_recursively(session, node_id) 

55 return [ 

56 groups_pb2.Parent( 

57 community=groups_pb2.CommunityParent( 

58 community_id=node_id, 

59 name=cluster.name, 

60 slug=cluster.slug, 

61 description=cluster.description, 

62 ) 

63 ) 

64 for node_id, parent_node_id, level, cluster in parents 

65 ] 

66 

67 

68def communities_to_pb( 

69 session: Session, nodes: Sequence[Node], context: CouchersContext 

70) -> list[communities_pb2.Community]: 

71 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

72 

73 official_clusters = [node.official_cluster for node in nodes] 

74 official_cluster_ids = [cluster.id for cluster in official_clusters] 

75 

76 member_counts: dict[int, int] = dict( 

77 session.execute( # type: ignore[arg-type] 

78 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

79 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

80 ) 

81 ).all() 

82 ) 

83 cluster_memberships = set( 

84 session.execute( 

85 select(ClusterSubscription.cluster_id) 

86 .where(ClusterSubscription.user_id == context.user_id) 

87 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

88 ) 

89 .scalars() 

90 .all() 

91 ) 

92 

93 admin_counts: dict[int, int] = dict( 

94 session.execute( # type: ignore[arg-type] 

95 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

96 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

97 ) 

98 ).all() 

99 ) 

100 cluster_adminships = set( 

101 session.execute( 

102 select(ClusterSubscription.cluster_id) 

103 .where(ClusterSubscription.user_id == context.user_id) 

104 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

105 .where(ClusterSubscription.role == ClusterRole.admin) 

106 ) 

107 .scalars() 

108 .all() 

109 ) 

110 

111 return [ 

112 communities_pb2.Community( 

113 community_id=node.id, 

114 name=official_cluster.name, 

115 slug=official_cluster.slug, 

116 description=official_cluster.description, 

117 created=Timestamp_from_datetime(node.created), 

118 parents=_parents_to_pb(session, node.id), 

119 member=official_cluster.id in cluster_memberships, 

120 admin=official_cluster.id in cluster_adminships, 

121 member_count=member_counts.get(official_cluster.id, 1), 

122 admin_count=admin_counts.get(official_cluster.id, 1), 

123 main_page=page_to_pb(session, official_cluster.main_page, context), 

124 can_moderate=can_moderate, 

125 small_community_features_enabled=official_cluster.small_community_features_enabled, 

126 node_type=nodetype2api[node.node_type], 

127 ) 

128 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

129 ] 

130 

131 

132def community_to_pb(session: Session, node: Node, context: CouchersContext) -> communities_pb2.Community: 

133 return communities_to_pb(session, [node], context)[0] 

134 

135 

136class Communities(communities_pb2_grpc.CommunitiesServicer): 

137 def GetCommunity( 

138 self, request: communities_pb2.GetCommunityReq, context: CouchersContext, session: Session 

139 ) -> communities_pb2.Community: 

140 node = session.execute( 

141 select(Node).where(Node.id == request.community_id).options(selectinload(Node.official_cluster)) 

142 ).scalar_one_or_none() 

143 if not node: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

145 

146 return community_to_pb(session, node, context) 

147 

148 def ListCommunities( 

149 self, request: communities_pb2.ListCommunitiesReq, context: CouchersContext, session: Session 

150 ) -> communities_pb2.ListCommunitiesRes: 

151 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

152 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

153 nodes = ( 

154 session.execute( 

155 select(Node) 

156 .join(Cluster, Cluster.parent_node_id == Node.id) 

157 .where(or_(Node.parent_node_id == request.community_id, to_bool(request.community_id == 0))) 

158 .where(Cluster.is_official_cluster) 

159 .order_by(Cluster.name) 

160 .limit(page_size + 1) 

161 .offset(offset) 

162 .options(selectinload(Node.official_cluster)) 

163 ) 

164 .scalars() 

165 .all() 

166 ) 

167 return communities_pb2.ListCommunitiesRes( 

168 communities=communities_to_pb(session, nodes[:page_size], context), 

169 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

170 ) 

171 

172 def SearchCommunities( 

173 self, request: communities_pb2.SearchCommunitiesReq, context: CouchersContext, session: Session 

174 ) -> communities_pb2.SearchCommunitiesRes: 

175 raw_query = request.query.strip() 

176 if len(raw_query) < 3: 

177 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short") 

178 

179 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

180 

181 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name)) 

182 

183 query = ( 

184 select(Node) 

185 .join(Cluster, Cluster.parent_node_id == Node.id) 

186 .where(Cluster.is_official_cluster) 

187 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD) 

188 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc()) 

189 .limit(page_size) 

190 ) 

191 

192 rows = session.execute(query.options(selectinload(Node.official_cluster))).scalars().all() 

193 

194 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context)) 

195 

196 def ListRecentCommunities( 

197 self, request: communities_pb2.ListRecentCommunitiesReq, context: CouchersContext, session: Session 

198 ) -> communities_pb2.ListRecentCommunitiesRes: 

199 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

200 rows = session.execute( 

201 select(Node, Cluster) 

202 .join(Cluster, Cluster.parent_node_id == Node.id) 

203 .where(Cluster.is_official_cluster) 

204 .order_by(Node.created.desc(), Node.id.desc()) 

205 .limit(page_size) 

206 ).all() 

207 return communities_pb2.ListRecentCommunitiesRes( 

208 communities=[ 

209 communities_pb2.CommunitySummary( 

210 community_id=node.id, 

211 name=cluster.name, 

212 slug=cluster.slug, 

213 created=Timestamp_from_datetime(node.created), 

214 node_type=nodetype2api[node.node_type], 

215 ) 

216 for node, cluster in rows 

217 ], 

218 ) 

219 

220 def ListGroups( 

221 self, request: communities_pb2.ListGroupsReq, context: CouchersContext, session: Session 

222 ) -> communities_pb2.ListGroupsRes: 

223 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

224 next_cluster_id = int(request.page_token) if request.page_token else 0 

225 clusters = ( 

226 session.execute( 

227 select(Cluster) 

228 .where(~Cluster.is_official_cluster) # not an official group 

229 .where(Cluster.parent_node_id == request.community_id) 

230 .where(Cluster.id >= next_cluster_id) 

231 .order_by(Cluster.id) 

232 .limit(page_size + 1) 

233 ) 

234 .scalars() 

235 .all() 

236 ) 

237 return communities_pb2.ListGroupsRes( 

238 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

239 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

240 ) 

241 

242 def ListAdmins( 

243 self, request: communities_pb2.ListAdminsReq, context: CouchersContext, session: Session 

244 ) -> communities_pb2.ListAdminsRes: 

245 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

246 next_admin_id = int(request.page_token) if request.page_token else 0 

247 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

248 if not node: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

250 admins = ( 

251 session.execute( 

252 select(User) 

253 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

254 .where(users_visible(context)) 

255 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

256 .where(ClusterSubscription.role == ClusterRole.admin) 

257 .where(User.id >= next_admin_id) 

258 .order_by(User.id) 

259 .limit(page_size + 1) 

260 ) 

261 .scalars() 

262 .all() 

263 ) 

264 return communities_pb2.ListAdminsRes( 

265 admin_user_ids=[admin.id for admin in admins[:page_size]], 

266 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

267 ) 

268 

269 def AddAdmin( 

270 self, request: communities_pb2.AddAdminReq, context: CouchersContext, session: Session 

271 ) -> empty_pb2.Empty: 

272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

273 if not node: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

275 if not can_moderate_node(session, context.user_id, node.id): 

276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

277 

278 user = session.execute( 

279 select(User).where(users_visible(context)).where(User.id == request.user_id) 

280 ).scalar_one_or_none() 

281 if not user: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true

282 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

283 

284 subscription = session.execute( 

285 select(ClusterSubscription) 

286 .where(ClusterSubscription.user_id == user.id) 

287 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

288 ).scalar_one_or_none() 

289 if not subscription: 

290 # Can't upgrade a member to admin if they're not already a member 

291 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

292 if subscription.role == ClusterRole.admin: 

293 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin") 

294 

295 subscription.role = ClusterRole.admin 

296 

297 return empty_pb2.Empty() 

298 

299 def RemoveAdmin( 

300 self, request: communities_pb2.RemoveAdminReq, context: CouchersContext, session: Session 

301 ) -> empty_pb2.Empty: 

302 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

303 if not node: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

305 if not can_moderate_node(session, context.user_id, node.id): 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

307 

308 user = session.execute( 

309 select(User).where(users_visible(context)).where(User.id == request.user_id) 

310 ).scalar_one_or_none() 

311 if not user: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true

312 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

313 

314 subscription = session.execute( 

315 select(ClusterSubscription) 

316 .where(ClusterSubscription.user_id == user.id) 

317 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

318 ).scalar_one_or_none() 

319 if not subscription: 

320 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

321 if subscription.role == ClusterRole.member: 

322 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

323 

324 subscription.role = ClusterRole.member 

325 

326 return empty_pb2.Empty() 

327 

328 def ListMembers( 

329 self, request: communities_pb2.ListMembersReq, context: CouchersContext, session: Session 

330 ) -> communities_pb2.ListMembersRes: 

331 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

332 next_member_id = int(request.page_token) if request.page_token else None 

333 

334 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

335 if not node: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

337 

338 query = ( 

339 select(User) 

340 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

341 .where(users_visible(context)) 

342 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

343 ) 

344 if next_member_id is not None: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 query = query.where(User.id <= next_member_id) 

346 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

347 

348 return communities_pb2.ListMembersRes( 

349 member_user_ids=[member.id for member in members[:page_size]], 

350 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

351 ) 

352 

353 def ListNearbyUsers( 

354 self, request: communities_pb2.ListNearbyUsersReq, context: CouchersContext, session: Session 

355 ) -> communities_pb2.ListNearbyUsersRes: 

356 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

357 next_nearby_id = int(request.page_token) if request.page_token else 0 

358 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

359 if not node: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

361 nearbys = ( 

362 session.execute( 

363 select(User) 

364 .where(users_visible(context)) 

365 .where(func.ST_Contains(node.geom, User.geom)) 

366 .where(User.id >= next_nearby_id) 

367 .order_by(User.id) 

368 .limit(page_size + 1) 

369 ) 

370 .scalars() 

371 .all() 

372 ) 

373 return communities_pb2.ListNearbyUsersRes( 

374 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

375 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

376 ) 

377 

378 def ListPlaces( 

379 self, request: communities_pb2.ListPlacesReq, context: CouchersContext, session: Session 

380 ) -> communities_pb2.ListPlacesRes: 

381 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

382 next_page_id = int(request.page_token) if request.page_token else 0 

383 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

384 if not node: 

385 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

386 places = ( 

387 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

388 .where(Page.id >= next_page_id) 

389 .order_by(Page.id) 

390 .limit(page_size + 1) 

391 .all() 

392 ) 

393 return communities_pb2.ListPlacesRes( 

394 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

395 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

396 ) 

397 

398 def ListGuides( 

399 self, request: communities_pb2.ListGuidesReq, context: CouchersContext, session: Session 

400 ) -> communities_pb2.ListGuidesRes: 

401 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

402 next_page_id = int(request.page_token) if request.page_token else 0 

403 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

404 if not node: 

405 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

406 guides = ( 

407 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

408 .where(Page.id >= next_page_id) 

409 .order_by(Page.id) 

410 .limit(page_size + 1) 

411 .all() 

412 ) 

413 return communities_pb2.ListGuidesRes( 

414 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

415 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

416 ) 

417 

418 def ListEvents( 

419 self, request: communities_pb2.ListEventsReq, context: CouchersContext, session: Session 

420 ) -> communities_pb2.ListEventsRes: 

421 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

422 # the page token is a unix timestamp of where we left off 

423 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

424 

425 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

426 if not node: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

428 if not node.official_cluster.small_community_features_enabled: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

430 

431 if not request.include_parents: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true

432 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

433 else: 

434 # the first value is the node_id, the last is the cluster (object) 

435 nodes_clusters_to_search = [ 

436 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

437 ] 

438 

439 membership_clauses = [] 

440 for node_id, official_cluster_obj in nodes_clusters_to_search: 

441 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

442 membership_clauses.append(Event.parent_node_id == node_id) 

443 

444 # for communities, we list events owned by this community or for which this is a parent 

445 query = ( 

446 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

447 ) 

448 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

449 

450 if request.past: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 cutoff = page_token + timedelta(seconds=1) 

452 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

453 else: 

454 cutoff = page_token - timedelta(seconds=1) 

455 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

456 

457 query = query.limit(page_size + 1) 

458 occurrences = session.execute(query).scalars().all() 

459 

460 return communities_pb2.ListEventsRes( 

461 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

462 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

463 ) 

464 

465 def ListDiscussions( 

466 self, request: communities_pb2.ListDiscussionsReq, context: CouchersContext, session: Session 

467 ) -> communities_pb2.ListDiscussionsRes: 

468 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

469 next_page_id = int(request.page_token) if request.page_token else 2**63 - 1 

470 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

471 if not node: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

473 if not node.official_cluster.small_community_features_enabled: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true

474 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled") 

475 has_visible_comments = ( 

476 where_moderated_content_visible( 

477 where_users_column_visible( 

478 select(func.count()) 

479 .select_from(Comment) 

480 .where(Comment.thread_id == Discussion.thread_id) 

481 .where(Comment.deleted == None), 

482 context, 

483 Comment.author_user_id, 

484 ), 

485 context, 

486 Comment, 

487 is_list_operation=True, 

488 ) 

489 .correlate(Discussion) 

490 .scalar_subquery() 

491 ) 

492 discussions = ( 

493 session.execute( 

494 where_moderated_content_visible( 

495 select(Discussion) 

496 .where(Discussion.owner_cluster_id == node.official_cluster.id) 

497 .where((Discussion.deleted == None) | (has_visible_comments > 0)) 

498 .where(Discussion.id <= next_page_id) 

499 .order_by(Discussion.id.desc()) 

500 .limit(page_size + 1), 

501 context, 

502 Discussion, 

503 is_list_operation=True, 

504 ) 

505 ) 

506 .scalars() 

507 .all() 

508 ) 

509 return communities_pb2.ListDiscussionsRes( 

510 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

511 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

512 ) 

513 

514 def JoinCommunity( 

515 self, request: communities_pb2.JoinCommunityReq, context: CouchersContext, session: Session 

516 ) -> empty_pb2.Empty: 

517 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

518 if not node: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

520 

521 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

522 if current_membership: 

523 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community") 

524 

525 node.official_cluster.cluster_subscriptions.append( 

526 ClusterSubscription( 

527 user_id=context.user_id, 

528 cluster_id=node.official_cluster.id, 

529 role=ClusterRole.member, 

530 ) 

531 ) 

532 

533 log_event( 

534 context, 

535 session, 

536 "community.joined", 

537 {"community_id": node.id, "community_name": node.official_cluster.name}, 

538 ) 

539 

540 return empty_pb2.Empty() 

541 

542 def LeaveCommunity( 

543 self, request: communities_pb2.LeaveCommunityReq, context: CouchersContext, session: Session 

544 ) -> empty_pb2.Empty: 

545 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

546 if not node: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true

547 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

548 

549 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

550 

551 if not current_membership: 

552 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community") 

553 

554 if is_user_in_node_geography(session, context.user_id, node.id): 

555 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community") 

556 

557 session.execute( 

558 delete(ClusterSubscription) 

559 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

560 .where(ClusterSubscription.user_id == context.user_id) 

561 ) 

562 

563 log_event( 

564 context, session, "community.left", {"community_id": node.id, "community_name": node.official_cluster.name} 

565 ) 

566 

567 return empty_pb2.Empty() 

568 

569 def ListUserCommunities( 

570 self, request: communities_pb2.ListUserCommunitiesReq, context: CouchersContext, session: Session 

571 ) -> communities_pb2.ListUserCommunitiesRes: 

572 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

573 next_node_id = int(request.page_token) if request.page_token else 0 

574 user_id = request.user_id or context.user_id 

575 nodes = ( 

576 session.execute( 

577 select(Node) 

578 .join(Cluster, Cluster.parent_node_id == Node.id) 

579 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

580 .where(ClusterSubscription.user_id == user_id) 

581 .where(Cluster.is_official_cluster) 

582 .where(Node.id >= next_node_id) 

583 .order_by(Node.id) 

584 .limit(page_size + 1) 

585 .options(selectinload(Node.official_cluster)) 

586 ) 

587 .scalars() 

588 .all() 

589 ) 

590 

591 return communities_pb2.ListUserCommunitiesRes( 

592 communities=communities_to_pb(session, nodes[:page_size], context), 

593 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

594 ) 

595 

596 def ListAllCommunities( 

597 self, request: communities_pb2.ListAllCommunitiesReq, context: CouchersContext, session: Session 

598 ) -> communities_pb2.ListAllCommunitiesRes: 

599 """List all communities ordered hierarchically by parent-child relationships""" 

600 # Get all nodes with their clusters, member counts, and user membership in a single query 

601 results = session.execute( 

602 select( 

603 Node, 

604 Cluster, 

605 ClusterSubscriptionCount.count, 

606 ClusterSubscription.cluster_id.label("user_subscription"), 

607 ) 

608 .join(Cluster, Cluster.parent_node_id == Node.id) 

609 .outerjoin( 

610 ClusterSubscriptionCount, 

611 ClusterSubscriptionCount.cluster_id == Cluster.id, 

612 ) 

613 .outerjoin( 

614 ClusterSubscription, 

615 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id), 

616 ) 

617 .where(Cluster.is_official_cluster) 

618 .order_by(Node.id) 

619 ).all() 

620 

621 return communities_pb2.ListAllCommunitiesRes( 

622 communities=[ 

623 communities_pb2.CommunitySummary( 

624 community_id=node.id, 

625 name=cluster.name, 

626 slug=cluster.slug, 

627 member=user_subscription is not None, 

628 member_count=member_count or 1, 

629 parents=_parents_to_pb(session, node.id), 

630 created=Timestamp_from_datetime(node.created), 

631 node_type=nodetype2api[node.node_type], 

632 ) 

633 for node, cluster, member_count, user_subscription in results 

634 ], 

635 )