Coverage for app / backend / src / couchers / servicers / communities.py: 79%

200 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy import select 

8from sqlalchemy.orm import Session 

9from sqlalchemy.sql import delete, func, or_ 

10 

11from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD 

12from couchers.context import CouchersContext 

13from couchers.crypto import decrypt_page_token, encrypt_page_token 

14from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography 

15from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

16from couchers.models import ( 

17 Cluster, 

18 ClusterRole, 

19 ClusterSubscription, 

20 Discussion, 

21 Event, 

22 EventOccurrence, 

23 Node, 

24 Page, 

25 PageType, 

26 User, 

27) 

28from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2 

29from couchers.servicers.discussions import discussion_to_pb 

30from couchers.servicers.events import event_to_pb 

31from couchers.servicers.groups import group_to_pb 

32from couchers.servicers.pages import page_to_pb 

33from couchers.sql import to_bool, users_visible 

34from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

35 

36logger = logging.getLogger(__name__) 

37 

38MAX_PAGINATION_LENGTH = 25 

39 

40 

41def _parents_to_pb(session: Session, node_id: int) -> list[groups_pb2.Parent]: 

42 parents = get_node_parents_recursively(session, node_id) 

43 return [ 

44 groups_pb2.Parent( 

45 community=groups_pb2.CommunityParent( 

46 community_id=node_id, 

47 name=cluster.name, 

48 slug=cluster.slug, 

49 description=cluster.description, 

50 ) 

51 ) 

52 for node_id, parent_node_id, level, cluster in parents 

53 ] 

54 

55 

56def communities_to_pb( 

57 session: Session, nodes: Sequence[Node], context: CouchersContext 

58) -> list[communities_pb2.Community]: 

59 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

60 

61 official_clusters = [node.official_cluster for node in nodes] 

62 official_cluster_ids = [cluster.id for cluster in official_clusters] 

63 

64 member_counts: dict[int, int] = dict( 

65 session.execute( # type: ignore[arg-type] 

66 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

67 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

68 ) 

69 ).all() 

70 ) 

71 cluster_memberships = set( 

72 session.execute( 

73 select(ClusterSubscription.cluster_id) 

74 .where(ClusterSubscription.user_id == context.user_id) 

75 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

76 ) 

77 .scalars() 

78 .all() 

79 ) 

80 

81 admin_counts: dict[int, int] = dict( 

82 session.execute( # type: ignore[arg-type] 

83 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

84 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

85 ) 

86 ).all() 

87 ) 

88 cluster_adminships = set( 

89 session.execute( 

90 select(ClusterSubscription.cluster_id) 

91 .where(ClusterSubscription.user_id == context.user_id) 

92 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

93 .where(ClusterSubscription.role == ClusterRole.admin) 

94 ) 

95 .scalars() 

96 .all() 

97 ) 

98 

99 return [ 

100 communities_pb2.Community( 

101 community_id=node.id, 

102 name=official_cluster.name, 

103 slug=official_cluster.slug, 

104 description=official_cluster.description, 

105 created=Timestamp_from_datetime(node.created), 

106 parents=_parents_to_pb(session, node.id), 

107 member=official_cluster.id in cluster_memberships, 

108 admin=official_cluster.id in cluster_adminships, 

109 member_count=member_counts.get(official_cluster.id, 1), 

110 admin_count=admin_counts.get(official_cluster.id, 1), 

111 main_page=page_to_pb(session, official_cluster.main_page, context), 

112 can_moderate=can_moderate, 

113 discussions_enabled=official_cluster.discussions_enabled, 

114 events_enabled=official_cluster.events_enabled, 

115 ) 

116 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

117 ] 

118 

119 

120def community_to_pb(session: Session, node: Node, context: CouchersContext) -> communities_pb2.Community: 

121 return communities_to_pb(session, [node], context)[0] 

122 

123 

124class Communities(communities_pb2_grpc.CommunitiesServicer): 

125 def GetCommunity( 

126 self, request: communities_pb2.GetCommunityReq, context: CouchersContext, session: Session 

127 ) -> communities_pb2.Community: 

128 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

129 if not node: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

131 

132 return community_to_pb(session, node, context) 

133 

134 def ListCommunities( 

135 self, request: communities_pb2.ListCommunitiesReq, context: CouchersContext, session: Session 

136 ) -> communities_pb2.ListCommunitiesRes: 

137 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

138 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

139 nodes = ( 

140 session.execute( 

141 select(Node) 

142 .join(Cluster, Cluster.parent_node_id == Node.id) 

143 .where(or_(Node.parent_node_id == request.community_id, to_bool(request.community_id == 0))) 

144 .where(Cluster.is_official_cluster) 

145 .order_by(Cluster.name) 

146 .limit(page_size + 1) 

147 .offset(offset) 

148 ) 

149 .scalars() 

150 .all() 

151 ) 

152 return communities_pb2.ListCommunitiesRes( 

153 communities=communities_to_pb(session, nodes[:page_size], context), 

154 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

155 ) 

156 

157 def SearchCommunities( 

158 self, request: communities_pb2.SearchCommunitiesReq, context: CouchersContext, session: Session 

159 ) -> communities_pb2.SearchCommunitiesRes: 

160 raw_query = request.query.strip() 

161 if len(raw_query) < 3: 

162 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short") 

163 

164 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

165 

166 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name)) 

167 

168 query = ( 

169 select(Node) 

170 .join(Cluster, Cluster.parent_node_id == Node.id) 

171 .where(Cluster.is_official_cluster) 

172 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD) 

173 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc()) 

174 .limit(page_size) 

175 ) 

176 

177 rows = session.execute(query).scalars().all() 

178 

179 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context)) 

180 

181 def ListGroups( 

182 self, request: communities_pb2.ListGroupsReq, context: CouchersContext, session: Session 

183 ) -> communities_pb2.ListGroupsRes: 

184 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

185 next_cluster_id = int(request.page_token) if request.page_token else 0 

186 clusters = ( 

187 session.execute( 

188 select(Cluster) 

189 .where(~Cluster.is_official_cluster) # not an official group 

190 .where(Cluster.parent_node_id == request.community_id) 

191 .where(Cluster.id >= next_cluster_id) 

192 .order_by(Cluster.id) 

193 .limit(page_size + 1) 

194 ) 

195 .scalars() 

196 .all() 

197 ) 

198 return communities_pb2.ListGroupsRes( 

199 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

200 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

201 ) 

202 

203 def ListAdmins( 

204 self, request: communities_pb2.ListAdminsReq, context: CouchersContext, session: Session 

205 ) -> communities_pb2.ListAdminsRes: 

206 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

207 next_admin_id = int(request.page_token) if request.page_token else 0 

208 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

209 if not node: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

211 admins = ( 

212 session.execute( 

213 select(User) 

214 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

215 .where(users_visible(context)) 

216 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

217 .where(ClusterSubscription.role == ClusterRole.admin) 

218 .where(User.id >= next_admin_id) 

219 .order_by(User.id) 

220 .limit(page_size + 1) 

221 ) 

222 .scalars() 

223 .all() 

224 ) 

225 return communities_pb2.ListAdminsRes( 

226 admin_user_ids=[admin.id for admin in admins[:page_size]], 

227 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

228 ) 

229 

230 def AddAdmin( 

231 self, request: communities_pb2.AddAdminReq, context: CouchersContext, session: Session 

232 ) -> empty_pb2.Empty: 

233 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

234 if not node: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

236 if not can_moderate_node(session, context.user_id, node.id): 

237 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

238 

239 user = session.execute( 

240 select(User).where(users_visible(context)).where(User.id == request.user_id) 

241 ).scalar_one_or_none() 

242 if not user: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

244 

245 subscription = session.execute( 

246 select(ClusterSubscription) 

247 .where(ClusterSubscription.user_id == user.id) 

248 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

249 ).scalar_one_or_none() 

250 if not subscription: 

251 # Can't upgrade a member to admin if they're not already a member 

252 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

253 if subscription.role == ClusterRole.admin: 

254 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin") 

255 

256 subscription.role = ClusterRole.admin 

257 

258 return empty_pb2.Empty() 

259 

260 def RemoveAdmin( 

261 self, request: communities_pb2.RemoveAdminReq, context: CouchersContext, session: Session 

262 ) -> empty_pb2.Empty: 

263 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

264 if not node: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

266 if not can_moderate_node(session, context.user_id, node.id): 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

268 

269 user = session.execute( 

270 select(User).where(users_visible(context)).where(User.id == request.user_id) 

271 ).scalar_one_or_none() 

272 if not user: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

274 

275 subscription = session.execute( 

276 select(ClusterSubscription) 

277 .where(ClusterSubscription.user_id == user.id) 

278 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

279 ).scalar_one_or_none() 

280 if not subscription: 

281 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

282 if subscription.role == ClusterRole.member: 

283 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

284 

285 subscription.role = ClusterRole.member 

286 

287 return empty_pb2.Empty() 

288 

289 def ListMembers( 

290 self, request: communities_pb2.ListMembersReq, context: CouchersContext, session: Session 

291 ) -> communities_pb2.ListMembersRes: 

292 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

293 next_member_id = int(request.page_token) if request.page_token else None 

294 

295 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

296 if not node: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

298 

299 query = ( 

300 select(User) 

301 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

302 .where(users_visible(context)) 

303 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

304 ) 

305 if next_member_id is not None: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 query = query.where(User.id <= next_member_id) 

307 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

308 

309 return communities_pb2.ListMembersRes( 

310 member_user_ids=[member.id for member in members[:page_size]], 

311 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

312 ) 

313 

314 def ListNearbyUsers( 

315 self, request: communities_pb2.ListNearbyUsersReq, context: CouchersContext, session: Session 

316 ) -> communities_pb2.ListNearbyUsersRes: 

317 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

318 next_nearby_id = int(request.page_token) if request.page_token else 0 

319 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

320 if not node: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

322 nearbys = ( 

323 session.execute( 

324 select(User) 

325 .where(users_visible(context)) 

326 .where(func.ST_Contains(node.geom, User.geom)) 

327 .where(User.id >= next_nearby_id) 

328 .order_by(User.id) 

329 .limit(page_size + 1) 

330 ) 

331 .scalars() 

332 .all() 

333 ) 

334 return communities_pb2.ListNearbyUsersRes( 

335 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

336 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

337 ) 

338 

339 def ListPlaces( 

340 self, request: communities_pb2.ListPlacesReq, context: CouchersContext, session: Session 

341 ) -> communities_pb2.ListPlacesRes: 

342 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

343 next_page_id = int(request.page_token) if request.page_token else 0 

344 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

345 if not node: 

346 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

347 places = ( 

348 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

349 .where(Page.id >= next_page_id) 

350 .order_by(Page.id) 

351 .limit(page_size + 1) 

352 .all() 

353 ) 

354 return communities_pb2.ListPlacesRes( 

355 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

356 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

357 ) 

358 

359 def ListGuides( 

360 self, request: communities_pb2.ListGuidesReq, context: CouchersContext, session: Session 

361 ) -> communities_pb2.ListGuidesRes: 

362 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

363 next_page_id = int(request.page_token) if request.page_token else 0 

364 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

365 if not node: 

366 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

367 guides = ( 

368 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

369 .where(Page.id >= next_page_id) 

370 .order_by(Page.id) 

371 .limit(page_size + 1) 

372 .all() 

373 ) 

374 return communities_pb2.ListGuidesRes( 

375 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

376 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

377 ) 

378 

379 def ListEvents( 

380 self, request: communities_pb2.ListEventsReq, context: CouchersContext, session: Session 

381 ) -> communities_pb2.ListEventsRes: 

382 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

383 # the page token is a unix timestamp of where we left off 

384 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

385 

386 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

387 if not node: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true

388 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

389 if not node.official_cluster.events_enabled: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true

390 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

391 

392 if not request.include_parents: 392 ↛ 396line 392 didn't jump to line 396 because the condition on line 392 was always true

393 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

394 else: 

395 # the first value is the node_id, the last is the cluster (object) 

396 nodes_clusters_to_search = [ 

397 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

398 ] 

399 

400 membership_clauses = [] 

401 for node_id, official_cluster_obj in nodes_clusters_to_search: 

402 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

403 membership_clauses.append(Event.parent_node_id == node_id) 

404 

405 # for communities, we list events owned by this community or for which this is a parent 

406 query = ( 

407 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

408 ) 

409 

410 if request.past: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 cutoff = page_token + timedelta(seconds=1) 

412 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

413 else: 

414 cutoff = page_token - timedelta(seconds=1) 

415 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

416 

417 query = query.limit(page_size + 1) 

418 occurrences = session.execute(query).scalars().all() 

419 

420 return communities_pb2.ListEventsRes( 

421 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

422 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

423 ) 

424 

425 def ListDiscussions( 

426 self, request: communities_pb2.ListDiscussionsReq, context: CouchersContext, session: Session 

427 ) -> communities_pb2.ListDiscussionsRes: 

428 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

429 next_page_id = int(request.page_token) if request.page_token else 0 

430 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

431 if not node: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

433 if not node.official_cluster.discussions_enabled: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled") 

435 discussions = ( 

436 node.official_cluster.owned_discussions.where( 

437 or_(Discussion.id <= next_page_id, to_bool(next_page_id == 0)) 

438 ) 

439 .order_by(Discussion.id.desc()) 

440 .limit(page_size + 1) 

441 .all() 

442 ) 

443 return communities_pb2.ListDiscussionsRes( 

444 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

445 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

446 ) 

447 

448 def JoinCommunity( 

449 self, request: communities_pb2.JoinCommunityReq, context: CouchersContext, session: Session 

450 ) -> empty_pb2.Empty: 

451 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

452 if not node: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

454 

455 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

456 if current_membership: 

457 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community") 

458 

459 node.official_cluster.cluster_subscriptions.append( 

460 ClusterSubscription( 

461 user_id=context.user_id, 

462 cluster_id=node.official_cluster.id, 

463 role=ClusterRole.member, 

464 ) 

465 ) 

466 

467 return empty_pb2.Empty() 

468 

469 def LeaveCommunity( 

470 self, request: communities_pb2.LeaveCommunityReq, context: CouchersContext, session: Session 

471 ) -> empty_pb2.Empty: 

472 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

473 if not node: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true

474 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

475 

476 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

477 

478 if not current_membership: 

479 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community") 

480 

481 if is_user_in_node_geography(session, context.user_id, node.id): 

482 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community") 

483 

484 session.execute( 

485 delete(ClusterSubscription) 

486 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

487 .where(ClusterSubscription.user_id == context.user_id) 

488 ) 

489 

490 return empty_pb2.Empty() 

491 

492 def ListUserCommunities( 

493 self, request: communities_pb2.ListUserCommunitiesReq, context: CouchersContext, session: Session 

494 ) -> communities_pb2.ListUserCommunitiesRes: 

495 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

496 next_node_id = int(request.page_token) if request.page_token else 0 

497 user_id = request.user_id or context.user_id 

498 nodes = ( 

499 session.execute( 

500 select(Node) 

501 .join(Cluster, Cluster.parent_node_id == Node.id) 

502 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

503 .where(ClusterSubscription.user_id == user_id) 

504 .where(Cluster.is_official_cluster) 

505 .where(Node.id >= next_node_id) 

506 .order_by(Node.id) 

507 .limit(page_size + 1) 

508 ) 

509 .scalars() 

510 .all() 

511 ) 

512 

513 return communities_pb2.ListUserCommunitiesRes( 

514 communities=communities_to_pb(session, nodes[:page_size], context), 

515 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

516 ) 

517 

518 def ListAllCommunities( 

519 self, request: communities_pb2.ListAllCommunitiesReq, context: CouchersContext, session: Session 

520 ) -> communities_pb2.ListAllCommunitiesRes: 

521 """List all communities ordered hierarchically by parent-child relationships""" 

522 # Get all nodes with their clusters, member counts, and user membership in a single query 

523 results = session.execute( 

524 select( 

525 Node, 

526 Cluster, 

527 ClusterSubscriptionCount.count, 

528 ClusterSubscription.cluster_id.label("user_subscription"), 

529 ) 

530 .join(Cluster, Cluster.parent_node_id == Node.id) 

531 .outerjoin( 

532 ClusterSubscriptionCount, 

533 ClusterSubscriptionCount.cluster_id == Cluster.id, 

534 ) 

535 .outerjoin( 

536 ClusterSubscription, 

537 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id), 

538 ) 

539 .where(Cluster.is_official_cluster) 

540 .order_by(Node.id) 

541 ).all() 

542 

543 return communities_pb2.ListAllCommunitiesRes( 

544 communities=[ 

545 communities_pb2.CommunitySummary( 

546 community_id=node.id, 

547 name=cluster.name, 

548 slug=cluster.slug, 

549 member=user_subscription is not None, 

550 member_count=member_count or 1, 

551 parents=_parents_to_pb(session, node.id), 

552 created=Timestamp_from_datetime(node.created), 

553 ) 

554 for node, cluster, member_count, user_subscription in results 

555 ], 

556 )