Coverage for app / backend / src / couchers / servicers / communities.py: 80%

209 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2from collections.abc import Sequence 

3from datetime import timedelta 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from sqlalchemy import select 

8from sqlalchemy.orm import Session, selectinload 

9from sqlalchemy.sql import delete, func, or_ 

10 

11from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD 

12from couchers.context import CouchersContext 

13from couchers.crypto import decrypt_page_token, encrypt_page_token 

14from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography 

15from couchers.event_log import log_event 

16from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

17from couchers.models import ( 

18 Cluster, 

19 ClusterRole, 

20 ClusterSubscription, 

21 Discussion, 

22 Event, 

23 EventOccurrence, 

24 Node, 

25 NodeType, 

26 Page, 

27 PageType, 

28 User, 

29) 

30from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2 

31from couchers.servicers.discussions import discussion_to_pb 

32from couchers.servicers.events import event_to_pb 

33from couchers.servicers.groups import group_to_pb 

34from couchers.servicers.pages import page_to_pb 

35from couchers.sql import to_bool, users_visible, where_moderated_content_visible 

36from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

37 

38logger = logging.getLogger(__name__) 

39 

40MAX_PAGINATION_LENGTH = 25 

41 

42nodetype2api = { 

43 NodeType.world: communities_pb2.NODE_TYPE_WORLD, 

44 NodeType.macroregion: communities_pb2.NODE_TYPE_MACROREGION, 

45 NodeType.region: communities_pb2.NODE_TYPE_REGION, 

46 NodeType.subregion: communities_pb2.NODE_TYPE_SUBREGION, 

47 NodeType.locality: communities_pb2.NODE_TYPE_LOCALITY, 

48 NodeType.sublocality: communities_pb2.NODE_TYPE_SUBLOCALITY, 

49} 

50 

51 

52def _parents_to_pb(session: Session, node_id: int) -> list[groups_pb2.Parent]: 

53 parents = get_node_parents_recursively(session, node_id) 

54 return [ 

55 groups_pb2.Parent( 

56 community=groups_pb2.CommunityParent( 

57 community_id=node_id, 

58 name=cluster.name, 

59 slug=cluster.slug, 

60 description=cluster.description, 

61 ) 

62 ) 

63 for node_id, parent_node_id, level, cluster in parents 

64 ] 

65 

66 

67def communities_to_pb( 

68 session: Session, nodes: Sequence[Node], context: CouchersContext 

69) -> list[communities_pb2.Community]: 

70 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

71 

72 official_clusters = [node.official_cluster for node in nodes] 

73 official_cluster_ids = [cluster.id for cluster in official_clusters] 

74 

75 member_counts: dict[int, int] = dict( 

76 session.execute( # type: ignore[arg-type] 

77 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

78 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

79 ) 

80 ).all() 

81 ) 

82 cluster_memberships = set( 

83 session.execute( 

84 select(ClusterSubscription.cluster_id) 

85 .where(ClusterSubscription.user_id == context.user_id) 

86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

87 ) 

88 .scalars() 

89 .all() 

90 ) 

91 

92 admin_counts: dict[int, int] = dict( 

93 session.execute( # type: ignore[arg-type] 

94 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

95 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

96 ) 

97 ).all() 

98 ) 

99 cluster_adminships = set( 

100 session.execute( 

101 select(ClusterSubscription.cluster_id) 

102 .where(ClusterSubscription.user_id == context.user_id) 

103 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

104 .where(ClusterSubscription.role == ClusterRole.admin) 

105 ) 

106 .scalars() 

107 .all() 

108 ) 

109 

110 return [ 

111 communities_pb2.Community( 

112 community_id=node.id, 

113 name=official_cluster.name, 

114 slug=official_cluster.slug, 

115 description=official_cluster.description, 

116 created=Timestamp_from_datetime(node.created), 

117 parents=_parents_to_pb(session, node.id), 

118 member=official_cluster.id in cluster_memberships, 

119 admin=official_cluster.id in cluster_adminships, 

120 member_count=member_counts.get(official_cluster.id, 1), 

121 admin_count=admin_counts.get(official_cluster.id, 1), 

122 main_page=page_to_pb(session, official_cluster.main_page, context), 

123 can_moderate=can_moderate, 

124 small_community_features_enabled=official_cluster.small_community_features_enabled, 

125 node_type=nodetype2api[node.node_type], 

126 ) 

127 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

128 ] 

129 

130 

131def community_to_pb(session: Session, node: Node, context: CouchersContext) -> communities_pb2.Community: 

132 return communities_to_pb(session, [node], context)[0] 

133 

134 

135class Communities(communities_pb2_grpc.CommunitiesServicer): 

136 def GetCommunity( 

137 self, request: communities_pb2.GetCommunityReq, context: CouchersContext, session: Session 

138 ) -> communities_pb2.Community: 

139 node = session.execute( 

140 select(Node).where(Node.id == request.community_id).options(selectinload(Node.official_cluster)) 

141 ).scalar_one_or_none() 

142 if not node: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

144 

145 return community_to_pb(session, node, context) 

146 

147 def ListCommunities( 

148 self, request: communities_pb2.ListCommunitiesReq, context: CouchersContext, session: Session 

149 ) -> communities_pb2.ListCommunitiesRes: 

150 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

151 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

152 nodes = ( 

153 session.execute( 

154 select(Node) 

155 .join(Cluster, Cluster.parent_node_id == Node.id) 

156 .where(or_(Node.parent_node_id == request.community_id, to_bool(request.community_id == 0))) 

157 .where(Cluster.is_official_cluster) 

158 .order_by(Cluster.name) 

159 .limit(page_size + 1) 

160 .offset(offset) 

161 .options(selectinload(Node.official_cluster)) 

162 ) 

163 .scalars() 

164 .all() 

165 ) 

166 return communities_pb2.ListCommunitiesRes( 

167 communities=communities_to_pb(session, nodes[:page_size], context), 

168 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

169 ) 

170 

171 def SearchCommunities( 

172 self, request: communities_pb2.SearchCommunitiesReq, context: CouchersContext, session: Session 

173 ) -> communities_pb2.SearchCommunitiesRes: 

174 raw_query = request.query.strip() 

175 if len(raw_query) < 3: 

176 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short") 

177 

178 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

179 

180 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name)) 

181 

182 query = ( 

183 select(Node) 

184 .join(Cluster, Cluster.parent_node_id == Node.id) 

185 .where(Cluster.is_official_cluster) 

186 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD) 

187 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc()) 

188 .limit(page_size) 

189 ) 

190 

191 rows = session.execute(query.options(selectinload(Node.official_cluster))).scalars().all() 

192 

193 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context)) 

194 

195 def ListRecentCommunities( 

196 self, request: communities_pb2.ListRecentCommunitiesReq, context: CouchersContext, session: Session 

197 ) -> communities_pb2.ListRecentCommunitiesRes: 

198 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

199 rows = session.execute( 

200 select(Node, Cluster) 

201 .join(Cluster, Cluster.parent_node_id == Node.id) 

202 .where(Cluster.is_official_cluster) 

203 .order_by(Node.created.desc(), Node.id.desc()) 

204 .limit(page_size) 

205 ).all() 

206 return communities_pb2.ListRecentCommunitiesRes( 

207 communities=[ 

208 communities_pb2.CommunitySummary( 

209 community_id=node.id, 

210 name=cluster.name, 

211 slug=cluster.slug, 

212 created=Timestamp_from_datetime(node.created), 

213 node_type=nodetype2api[node.node_type], 

214 ) 

215 for node, cluster in rows 

216 ], 

217 ) 

218 

219 def ListGroups( 

220 self, request: communities_pb2.ListGroupsReq, context: CouchersContext, session: Session 

221 ) -> communities_pb2.ListGroupsRes: 

222 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

223 next_cluster_id = int(request.page_token) if request.page_token else 0 

224 clusters = ( 

225 session.execute( 

226 select(Cluster) 

227 .where(~Cluster.is_official_cluster) # not an official group 

228 .where(Cluster.parent_node_id == request.community_id) 

229 .where(Cluster.id >= next_cluster_id) 

230 .order_by(Cluster.id) 

231 .limit(page_size + 1) 

232 ) 

233 .scalars() 

234 .all() 

235 ) 

236 return communities_pb2.ListGroupsRes( 

237 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

238 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

239 ) 

240 

241 def ListAdmins( 

242 self, request: communities_pb2.ListAdminsReq, context: CouchersContext, session: Session 

243 ) -> communities_pb2.ListAdminsRes: 

244 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

245 next_admin_id = int(request.page_token) if request.page_token else 0 

246 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

247 if not node: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

249 admins = ( 

250 session.execute( 

251 select(User) 

252 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

253 .where(users_visible(context)) 

254 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

255 .where(ClusterSubscription.role == ClusterRole.admin) 

256 .where(User.id >= next_admin_id) 

257 .order_by(User.id) 

258 .limit(page_size + 1) 

259 ) 

260 .scalars() 

261 .all() 

262 ) 

263 return communities_pb2.ListAdminsRes( 

264 admin_user_ids=[admin.id for admin in admins[:page_size]], 

265 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

266 ) 

267 

268 def AddAdmin( 

269 self, request: communities_pb2.AddAdminReq, context: CouchersContext, session: Session 

270 ) -> empty_pb2.Empty: 

271 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

272 if not node: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

274 if not can_moderate_node(session, context.user_id, node.id): 

275 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

276 

277 user = session.execute( 

278 select(User).where(users_visible(context)).where(User.id == request.user_id) 

279 ).scalar_one_or_none() 

280 if not user: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

282 

283 subscription = session.execute( 

284 select(ClusterSubscription) 

285 .where(ClusterSubscription.user_id == user.id) 

286 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

287 ).scalar_one_or_none() 

288 if not subscription: 

289 # Can't upgrade a member to admin if they're not already a member 

290 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

291 if subscription.role == ClusterRole.admin: 

292 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin") 

293 

294 subscription.role = ClusterRole.admin 

295 

296 return empty_pb2.Empty() 

297 

298 def RemoveAdmin( 

299 self, request: communities_pb2.RemoveAdminReq, context: CouchersContext, session: Session 

300 ) -> empty_pb2.Empty: 

301 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

302 if not node: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

304 if not can_moderate_node(session, context.user_id, node.id): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

306 

307 user = session.execute( 

308 select(User).where(users_visible(context)).where(User.id == request.user_id) 

309 ).scalar_one_or_none() 

310 if not user: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

312 

313 subscription = session.execute( 

314 select(ClusterSubscription) 

315 .where(ClusterSubscription.user_id == user.id) 

316 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

317 ).scalar_one_or_none() 

318 if not subscription: 

319 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

320 if subscription.role == ClusterRole.member: 

321 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

322 

323 subscription.role = ClusterRole.member 

324 

325 return empty_pb2.Empty() 

326 

327 def ListMembers( 

328 self, request: communities_pb2.ListMembersReq, context: CouchersContext, session: Session 

329 ) -> communities_pb2.ListMembersRes: 

330 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

331 next_member_id = int(request.page_token) if request.page_token else None 

332 

333 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

334 if not node: 334 ↛ 335line 334 didn't jump to line 335 because the condition on line 334 was never true

335 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

336 

337 query = ( 

338 select(User) 

339 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

340 .where(users_visible(context)) 

341 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

342 ) 

343 if next_member_id is not None: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 query = query.where(User.id <= next_member_id) 

345 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

346 

347 return communities_pb2.ListMembersRes( 

348 member_user_ids=[member.id for member in members[:page_size]], 

349 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

350 ) 

351 

352 def ListNearbyUsers( 

353 self, request: communities_pb2.ListNearbyUsersReq, context: CouchersContext, session: Session 

354 ) -> communities_pb2.ListNearbyUsersRes: 

355 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

356 next_nearby_id = int(request.page_token) if request.page_token else 0 

357 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

358 if not node: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

360 nearbys = ( 

361 session.execute( 

362 select(User) 

363 .where(users_visible(context)) 

364 .where(func.ST_Contains(node.geom, User.geom)) 

365 .where(User.id >= next_nearby_id) 

366 .order_by(User.id) 

367 .limit(page_size + 1) 

368 ) 

369 .scalars() 

370 .all() 

371 ) 

372 return communities_pb2.ListNearbyUsersRes( 

373 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

374 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

375 ) 

376 

377 def ListPlaces( 

378 self, request: communities_pb2.ListPlacesReq, context: CouchersContext, session: Session 

379 ) -> communities_pb2.ListPlacesRes: 

380 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

381 next_page_id = int(request.page_token) if request.page_token else 0 

382 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

383 if not node: 

384 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

385 places = ( 

386 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

387 .where(Page.id >= next_page_id) 

388 .order_by(Page.id) 

389 .limit(page_size + 1) 

390 .all() 

391 ) 

392 return communities_pb2.ListPlacesRes( 

393 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

394 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

395 ) 

396 

397 def ListGuides( 

398 self, request: communities_pb2.ListGuidesReq, context: CouchersContext, session: Session 

399 ) -> communities_pb2.ListGuidesRes: 

400 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

401 next_page_id = int(request.page_token) if request.page_token else 0 

402 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

403 if not node: 

404 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

405 guides = ( 

406 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

407 .where(Page.id >= next_page_id) 

408 .order_by(Page.id) 

409 .limit(page_size + 1) 

410 .all() 

411 ) 

412 return communities_pb2.ListGuidesRes( 

413 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

414 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

415 ) 

416 

417 def ListEvents( 

418 self, request: communities_pb2.ListEventsReq, context: CouchersContext, session: Session 

419 ) -> communities_pb2.ListEventsRes: 

420 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

421 # the page token is a unix timestamp of where we left off 

422 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

423 

424 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

425 if not node: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

427 if not node.official_cluster.small_community_features_enabled: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

429 

430 if not request.include_parents: 430 ↛ 434line 430 didn't jump to line 434 because the condition on line 430 was always true

431 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

432 else: 

433 # the first value is the node_id, the last is the cluster (object) 

434 nodes_clusters_to_search = [ 

435 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

436 ] 

437 

438 membership_clauses = [] 

439 for node_id, official_cluster_obj in nodes_clusters_to_search: 

440 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

441 membership_clauses.append(Event.parent_node_id == node_id) 

442 

443 # for communities, we list events owned by this community or for which this is a parent 

444 query = ( 

445 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

446 ) 

447 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

448 

449 if request.past: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 cutoff = page_token + timedelta(seconds=1) 

451 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

452 else: 

453 cutoff = page_token - timedelta(seconds=1) 

454 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

455 

456 query = query.limit(page_size + 1) 

457 occurrences = session.execute(query).scalars().all() 

458 

459 return communities_pb2.ListEventsRes( 

460 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

461 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

462 ) 

463 

464 def ListDiscussions( 

465 self, request: communities_pb2.ListDiscussionsReq, context: CouchersContext, session: Session 

466 ) -> communities_pb2.ListDiscussionsRes: 

467 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

468 next_page_id = int(request.page_token) if request.page_token else 0 

469 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

470 if not node: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

472 if not node.official_cluster.small_community_features_enabled: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled") 

474 discussions = ( 

475 node.official_cluster.owned_discussions.where( 

476 or_(Discussion.id <= next_page_id, to_bool(next_page_id == 0)) 

477 ) 

478 .order_by(Discussion.id.desc()) 

479 .limit(page_size + 1) 

480 .all() 

481 ) 

482 return communities_pb2.ListDiscussionsRes( 

483 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

484 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

485 ) 

486 

487 def JoinCommunity( 

488 self, request: communities_pb2.JoinCommunityReq, context: CouchersContext, session: Session 

489 ) -> empty_pb2.Empty: 

490 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

491 if not node: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

493 

494 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

495 if current_membership: 

496 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community") 

497 

498 node.official_cluster.cluster_subscriptions.append( 

499 ClusterSubscription( 

500 user_id=context.user_id, 

501 cluster_id=node.official_cluster.id, 

502 role=ClusterRole.member, 

503 ) 

504 ) 

505 

506 log_event( 

507 context, 

508 session, 

509 "community.joined", 

510 {"community_id": node.id, "community_name": node.official_cluster.name}, 

511 ) 

512 

513 return empty_pb2.Empty() 

514 

515 def LeaveCommunity( 

516 self, request: communities_pb2.LeaveCommunityReq, context: CouchersContext, session: Session 

517 ) -> empty_pb2.Empty: 

518 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

519 if not node: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

521 

522 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

523 

524 if not current_membership: 

525 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community") 

526 

527 if is_user_in_node_geography(session, context.user_id, node.id): 

528 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community") 

529 

530 session.execute( 

531 delete(ClusterSubscription) 

532 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

533 .where(ClusterSubscription.user_id == context.user_id) 

534 ) 

535 

536 log_event( 

537 context, session, "community.left", {"community_id": node.id, "community_name": node.official_cluster.name} 

538 ) 

539 

540 return empty_pb2.Empty() 

541 

542 def ListUserCommunities( 

543 self, request: communities_pb2.ListUserCommunitiesReq, context: CouchersContext, session: Session 

544 ) -> communities_pb2.ListUserCommunitiesRes: 

545 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

546 next_node_id = int(request.page_token) if request.page_token else 0 

547 user_id = request.user_id or context.user_id 

548 nodes = ( 

549 session.execute( 

550 select(Node) 

551 .join(Cluster, Cluster.parent_node_id == Node.id) 

552 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

553 .where(ClusterSubscription.user_id == user_id) 

554 .where(Cluster.is_official_cluster) 

555 .where(Node.id >= next_node_id) 

556 .order_by(Node.id) 

557 .limit(page_size + 1) 

558 .options(selectinload(Node.official_cluster)) 

559 ) 

560 .scalars() 

561 .all() 

562 ) 

563 

564 return communities_pb2.ListUserCommunitiesRes( 

565 communities=communities_to_pb(session, nodes[:page_size], context), 

566 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

567 ) 

568 

569 def ListAllCommunities( 

570 self, request: communities_pb2.ListAllCommunitiesReq, context: CouchersContext, session: Session 

571 ) -> communities_pb2.ListAllCommunitiesRes: 

572 """List all communities ordered hierarchically by parent-child relationships""" 

573 # Get all nodes with their clusters, member counts, and user membership in a single query 

574 results = session.execute( 

575 select( 

576 Node, 

577 Cluster, 

578 ClusterSubscriptionCount.count, 

579 ClusterSubscription.cluster_id.label("user_subscription"), 

580 ) 

581 .join(Cluster, Cluster.parent_node_id == Node.id) 

582 .outerjoin( 

583 ClusterSubscriptionCount, 

584 ClusterSubscriptionCount.cluster_id == Cluster.id, 

585 ) 

586 .outerjoin( 

587 ClusterSubscription, 

588 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id), 

589 ) 

590 .where(Cluster.is_official_cluster) 

591 .order_by(Node.id) 

592 ).all() 

593 

594 return communities_pb2.ListAllCommunitiesRes( 

595 communities=[ 

596 communities_pb2.CommunitySummary( 

597 community_id=node.id, 

598 name=cluster.name, 

599 slug=cluster.slug, 

600 member=user_subscription is not None, 

601 member_count=member_count or 1, 

602 parents=_parents_to_pb(session, node.id), 

603 created=Timestamp_from_datetime(node.created), 

604 node_type=nodetype2api[node.node_type], 

605 ) 

606 for node, cluster, member_count, user_subscription in results 

607 ], 

608 )