Coverage for app/backend/src/couchers/servicers/communities.py: 80%
210 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
2from collections.abc import Sequence
3from datetime import timedelta
5import grpc
6from google.protobuf import empty_pb2
7from sqlalchemy import select
8from sqlalchemy.orm import Session, selectinload
9from sqlalchemy.sql import delete, func, or_
11from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD
12from couchers.context import CouchersContext
13from couchers.crypto import decrypt_page_token, encrypt_page_token
14from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography
15from couchers.event_log import log_event
16from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount
17from couchers.models import (
18 Cluster,
19 ClusterRole,
20 ClusterSubscription,
21 Comment,
22 Discussion,
23 Event,
24 EventOccurrence,
25 Node,
26 NodeType,
27 Page,
28 PageType,
29 User,
30)
31from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2
32from couchers.servicers.discussions import discussion_to_pb
33from couchers.servicers.events import event_to_pb
34from couchers.servicers.groups import group_to_pb
35from couchers.servicers.pages import page_to_pb
36from couchers.sql import to_bool, users_visible, where_moderated_content_visible, where_users_column_visible
37from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
39logger = logging.getLogger(__name__)
41MAX_PAGINATION_LENGTH = 25
43nodetype2api = {
44 NodeType.world: communities_pb2.NODE_TYPE_WORLD,
45 NodeType.macroregion: communities_pb2.NODE_TYPE_MACROREGION,
46 NodeType.region: communities_pb2.NODE_TYPE_REGION,
47 NodeType.subregion: communities_pb2.NODE_TYPE_SUBREGION,
48 NodeType.locality: communities_pb2.NODE_TYPE_LOCALITY,
49 NodeType.sublocality: communities_pb2.NODE_TYPE_SUBLOCALITY,
50}
53def _parents_to_pb(session: Session, node_id: int) -> list[groups_pb2.Parent]:
54 parents = get_node_parents_recursively(session, node_id)
55 return [
56 groups_pb2.Parent(
57 community=groups_pb2.CommunityParent(
58 community_id=node_id,
59 name=cluster.name,
60 slug=cluster.slug,
61 description=cluster.description,
62 )
63 )
64 for node_id, parent_node_id, level, cluster in parents
65 ]
68def communities_to_pb(
69 session: Session, nodes: Sequence[Node], context: CouchersContext
70) -> list[communities_pb2.Community]:
71 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
73 official_clusters = [node.official_cluster for node in nodes]
74 official_cluster_ids = [cluster.id for cluster in official_clusters]
76 member_counts: dict[int, int] = dict(
77 session.execute( # type: ignore[arg-type]
78 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where(
79 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids)
80 )
81 ).all()
82 )
83 cluster_memberships = set(
84 session.execute(
85 select(ClusterSubscription.cluster_id)
86 .where(ClusterSubscription.user_id == context.user_id)
87 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
88 )
89 .scalars()
90 .all()
91 )
93 admin_counts: dict[int, int] = dict(
94 session.execute( # type: ignore[arg-type]
95 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where(
96 ClusterAdminCount.cluster_id.in_(official_cluster_ids)
97 )
98 ).all()
99 )
100 cluster_adminships = set(
101 session.execute(
102 select(ClusterSubscription.cluster_id)
103 .where(ClusterSubscription.user_id == context.user_id)
104 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
105 .where(ClusterSubscription.role == ClusterRole.admin)
106 )
107 .scalars()
108 .all()
109 )
111 return [
112 communities_pb2.Community(
113 community_id=node.id,
114 name=official_cluster.name,
115 slug=official_cluster.slug,
116 description=official_cluster.description,
117 created=Timestamp_from_datetime(node.created),
118 parents=_parents_to_pb(session, node.id),
119 member=official_cluster.id in cluster_memberships,
120 admin=official_cluster.id in cluster_adminships,
121 member_count=member_counts.get(official_cluster.id, 1),
122 admin_count=admin_counts.get(official_cluster.id, 1),
123 main_page=page_to_pb(session, official_cluster.main_page, context),
124 can_moderate=can_moderate,
125 small_community_features_enabled=official_cluster.small_community_features_enabled,
126 node_type=nodetype2api[node.node_type],
127 )
128 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
129 ]
132def community_to_pb(session: Session, node: Node, context: CouchersContext) -> communities_pb2.Community:
133 return communities_to_pb(session, [node], context)[0]
136class Communities(communities_pb2_grpc.CommunitiesServicer):
137 def GetCommunity(
138 self, request: communities_pb2.GetCommunityReq, context: CouchersContext, session: Session
139 ) -> communities_pb2.Community:
140 node = session.execute(
141 select(Node).where(Node.id == request.community_id).options(selectinload(Node.official_cluster))
142 ).scalar_one_or_none()
143 if not node: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
146 return community_to_pb(session, node, context)
148 def ListCommunities(
149 self, request: communities_pb2.ListCommunitiesReq, context: CouchersContext, session: Session
150 ) -> communities_pb2.ListCommunitiesRes:
151 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
152 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
153 nodes = (
154 session.execute(
155 select(Node)
156 .join(Cluster, Cluster.parent_node_id == Node.id)
157 .where(or_(Node.parent_node_id == request.community_id, to_bool(request.community_id == 0)))
158 .where(Cluster.is_official_cluster)
159 .order_by(Cluster.name)
160 .limit(page_size + 1)
161 .offset(offset)
162 .options(selectinload(Node.official_cluster))
163 )
164 .scalars()
165 .all()
166 )
167 return communities_pb2.ListCommunitiesRes(
168 communities=communities_to_pb(session, nodes[:page_size], context),
169 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
170 )
172 def SearchCommunities(
173 self, request: communities_pb2.SearchCommunitiesReq, context: CouchersContext, session: Session
174 ) -> communities_pb2.SearchCommunitiesRes:
175 raw_query = request.query.strip()
176 if len(raw_query) < 3:
177 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short")
179 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
181 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name))
183 query = (
184 select(Node)
185 .join(Cluster, Cluster.parent_node_id == Node.id)
186 .where(Cluster.is_official_cluster)
187 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD)
188 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc())
189 .limit(page_size)
190 )
192 rows = session.execute(query.options(selectinload(Node.official_cluster))).scalars().all()
194 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context))
196 def ListRecentCommunities(
197 self, request: communities_pb2.ListRecentCommunitiesReq, context: CouchersContext, session: Session
198 ) -> communities_pb2.ListRecentCommunitiesRes:
199 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
200 rows = session.execute(
201 select(Node, Cluster)
202 .join(Cluster, Cluster.parent_node_id == Node.id)
203 .where(Cluster.is_official_cluster)
204 .order_by(Node.created.desc(), Node.id.desc())
205 .limit(page_size)
206 ).all()
207 return communities_pb2.ListRecentCommunitiesRes(
208 communities=[
209 communities_pb2.CommunitySummary(
210 community_id=node.id,
211 name=cluster.name,
212 slug=cluster.slug,
213 created=Timestamp_from_datetime(node.created),
214 node_type=nodetype2api[node.node_type],
215 )
216 for node, cluster in rows
217 ],
218 )
220 def ListGroups(
221 self, request: communities_pb2.ListGroupsReq, context: CouchersContext, session: Session
222 ) -> communities_pb2.ListGroupsRes:
223 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
224 next_cluster_id = int(request.page_token) if request.page_token else 0
225 clusters = (
226 session.execute(
227 select(Cluster)
228 .where(~Cluster.is_official_cluster) # not an official group
229 .where(Cluster.parent_node_id == request.community_id)
230 .where(Cluster.id >= next_cluster_id)
231 .order_by(Cluster.id)
232 .limit(page_size + 1)
233 )
234 .scalars()
235 .all()
236 )
237 return communities_pb2.ListGroupsRes(
238 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
239 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
240 )
242 def ListAdmins(
243 self, request: communities_pb2.ListAdminsReq, context: CouchersContext, session: Session
244 ) -> communities_pb2.ListAdminsRes:
245 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
246 next_admin_id = int(request.page_token) if request.page_token else 0
247 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
248 if not node: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
250 admins = (
251 session.execute(
252 select(User)
253 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
254 .where(users_visible(context))
255 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
256 .where(ClusterSubscription.role == ClusterRole.admin)
257 .where(User.id >= next_admin_id)
258 .order_by(User.id)
259 .limit(page_size + 1)
260 )
261 .scalars()
262 .all()
263 )
264 return communities_pb2.ListAdminsRes(
265 admin_user_ids=[admin.id for admin in admins[:page_size]],
266 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
267 )
269 def AddAdmin(
270 self, request: communities_pb2.AddAdminReq, context: CouchersContext, session: Session
271 ) -> empty_pb2.Empty:
272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
273 if not node: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
275 if not can_moderate_node(session, context.user_id, node.id):
276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied")
278 user = session.execute(
279 select(User).where(users_visible(context)).where(User.id == request.user_id)
280 ).scalar_one_or_none()
281 if not user: 281 ↛ 282line 281 didn't jump to line 282 because the condition on line 281 was never true
282 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
284 subscription = session.execute(
285 select(ClusterSubscription)
286 .where(ClusterSubscription.user_id == user.id)
287 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
288 ).scalar_one_or_none()
289 if not subscription:
290 # Can't upgrade a member to admin if they're not already a member
291 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member")
292 if subscription.role == ClusterRole.admin:
293 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin")
295 subscription.role = ClusterRole.admin
297 return empty_pb2.Empty()
299 def RemoveAdmin(
300 self, request: communities_pb2.RemoveAdminReq, context: CouchersContext, session: Session
301 ) -> empty_pb2.Empty:
302 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
303 if not node: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
305 if not can_moderate_node(session, context.user_id, node.id): 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied")
308 user = session.execute(
309 select(User).where(users_visible(context)).where(User.id == request.user_id)
310 ).scalar_one_or_none()
311 if not user: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
314 subscription = session.execute(
315 select(ClusterSubscription)
316 .where(ClusterSubscription.user_id == user.id)
317 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
318 ).scalar_one_or_none()
319 if not subscription:
320 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member")
321 if subscription.role == ClusterRole.member:
322 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
324 subscription.role = ClusterRole.member
326 return empty_pb2.Empty()
328 def ListMembers(
329 self, request: communities_pb2.ListMembersReq, context: CouchersContext, session: Session
330 ) -> communities_pb2.ListMembersRes:
331 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
332 next_member_id = int(request.page_token) if request.page_token else None
334 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
335 if not node: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
338 query = (
339 select(User)
340 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
341 .where(users_visible(context))
342 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
343 )
344 if next_member_id is not None: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 query = query.where(User.id <= next_member_id)
346 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all()
348 return communities_pb2.ListMembersRes(
349 member_user_ids=[member.id for member in members[:page_size]],
350 next_page_token=str(members[-1].id) if len(members) > page_size else None,
351 )
353 def ListNearbyUsers(
354 self, request: communities_pb2.ListNearbyUsersReq, context: CouchersContext, session: Session
355 ) -> communities_pb2.ListNearbyUsersRes:
356 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
357 next_nearby_id = int(request.page_token) if request.page_token else 0
358 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
359 if not node: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
361 nearbys = (
362 session.execute(
363 select(User)
364 .where(users_visible(context))
365 .where(func.ST_Contains(node.geom, User.geom))
366 .where(User.id >= next_nearby_id)
367 .order_by(User.id)
368 .limit(page_size + 1)
369 )
370 .scalars()
371 .all()
372 )
373 return communities_pb2.ListNearbyUsersRes(
374 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
375 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
376 )
378 def ListPlaces(
379 self, request: communities_pb2.ListPlacesReq, context: CouchersContext, session: Session
380 ) -> communities_pb2.ListPlacesRes:
381 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
382 next_page_id = int(request.page_token) if request.page_token else 0
383 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
384 if not node:
385 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
386 places = (
387 node.official_cluster.owned_pages.where(Page.type == PageType.place)
388 .where(Page.id >= next_page_id)
389 .order_by(Page.id)
390 .limit(page_size + 1)
391 .all()
392 )
393 return communities_pb2.ListPlacesRes(
394 places=[page_to_pb(session, page, context) for page in places[:page_size]],
395 next_page_token=str(places[-1].id) if len(places) > page_size else None,
396 )
398 def ListGuides(
399 self, request: communities_pb2.ListGuidesReq, context: CouchersContext, session: Session
400 ) -> communities_pb2.ListGuidesRes:
401 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
402 next_page_id = int(request.page_token) if request.page_token else 0
403 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
404 if not node:
405 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
406 guides = (
407 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
408 .where(Page.id >= next_page_id)
409 .order_by(Page.id)
410 .limit(page_size + 1)
411 .all()
412 )
413 return communities_pb2.ListGuidesRes(
414 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
415 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
416 )
418 def ListEvents(
419 self, request: communities_pb2.ListEventsReq, context: CouchersContext, session: Session
420 ) -> communities_pb2.ListEventsRes:
421 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
422 # the page token is a unix timestamp of where we left off
423 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
425 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
426 if not node: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
428 if not node.official_cluster.small_community_features_enabled: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled")
431 if not request.include_parents: 431 ↛ 435line 431 didn't jump to line 435 because the condition on line 431 was always true
432 nodes_clusters_to_search = [(node.id, node.official_cluster)]
433 else:
434 # the first value is the node_id, the last is the cluster (object)
435 nodes_clusters_to_search = [
436 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id)
437 ]
439 membership_clauses = []
440 for node_id, official_cluster_obj in nodes_clusters_to_search:
441 membership_clauses.append(Event.owner_cluster == official_cluster_obj)
442 membership_clauses.append(Event.parent_node_id == node_id)
444 # for communities, we list events owned by this community or for which this is a parent
445 query = (
446 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses))
447 )
448 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True)
450 if request.past: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true
451 cutoff = page_token + timedelta(seconds=1)
452 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc())
453 else:
454 cutoff = page_token - timedelta(seconds=1)
455 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc())
457 query = query.limit(page_size + 1)
458 occurrences = session.execute(query).scalars().all()
460 return communities_pb2.ListEventsRes(
461 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
462 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
463 )
465 def ListDiscussions(
466 self, request: communities_pb2.ListDiscussionsReq, context: CouchersContext, session: Session
467 ) -> communities_pb2.ListDiscussionsRes:
468 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
469 next_page_id = int(request.page_token) if request.page_token else 2**63 - 1
470 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
471 if not node: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
473 if not node.official_cluster.small_community_features_enabled: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true
474 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled")
475 has_visible_comments = (
476 where_moderated_content_visible(
477 where_users_column_visible(
478 select(func.count())
479 .select_from(Comment)
480 .where(Comment.thread_id == Discussion.thread_id)
481 .where(Comment.deleted == None),
482 context,
483 Comment.author_user_id,
484 ),
485 context,
486 Comment,
487 is_list_operation=True,
488 )
489 .correlate(Discussion)
490 .scalar_subquery()
491 )
492 discussions = (
493 session.execute(
494 where_moderated_content_visible(
495 select(Discussion)
496 .where(Discussion.owner_cluster_id == node.official_cluster.id)
497 .where((Discussion.deleted == None) | (has_visible_comments > 0))
498 .where(Discussion.id <= next_page_id)
499 .order_by(Discussion.id.desc())
500 .limit(page_size + 1),
501 context,
502 Discussion,
503 is_list_operation=True,
504 )
505 )
506 .scalars()
507 .all()
508 )
509 return communities_pb2.ListDiscussionsRes(
510 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
511 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
512 )
514 def JoinCommunity(
515 self, request: communities_pb2.JoinCommunityReq, context: CouchersContext, session: Session
516 ) -> empty_pb2.Empty:
517 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
518 if not node: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
521 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
522 if current_membership:
523 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community")
525 node.official_cluster.cluster_subscriptions.append(
526 ClusterSubscription(
527 user_id=context.user_id,
528 cluster_id=node.official_cluster.id,
529 role=ClusterRole.member,
530 )
531 )
533 log_event(
534 context,
535 session,
536 "community.joined",
537 {"community_id": node.id, "community_name": node.official_cluster.name},
538 )
540 return empty_pb2.Empty()
542 def LeaveCommunity(
543 self, request: communities_pb2.LeaveCommunityReq, context: CouchersContext, session: Session
544 ) -> empty_pb2.Empty:
545 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
546 if not node: 546 ↛ 547line 546 didn't jump to line 547 because the condition on line 546 was never true
547 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
549 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
551 if not current_membership:
552 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community")
554 if is_user_in_node_geography(session, context.user_id, node.id):
555 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community")
557 session.execute(
558 delete(ClusterSubscription)
559 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
560 .where(ClusterSubscription.user_id == context.user_id)
561 )
563 log_event(
564 context, session, "community.left", {"community_id": node.id, "community_name": node.official_cluster.name}
565 )
567 return empty_pb2.Empty()
569 def ListUserCommunities(
570 self, request: communities_pb2.ListUserCommunitiesReq, context: CouchersContext, session: Session
571 ) -> communities_pb2.ListUserCommunitiesRes:
572 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
573 next_node_id = int(request.page_token) if request.page_token else 0
574 user_id = request.user_id or context.user_id
575 nodes = (
576 session.execute(
577 select(Node)
578 .join(Cluster, Cluster.parent_node_id == Node.id)
579 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
580 .where(ClusterSubscription.user_id == user_id)
581 .where(Cluster.is_official_cluster)
582 .where(Node.id >= next_node_id)
583 .order_by(Node.id)
584 .limit(page_size + 1)
585 .options(selectinload(Node.official_cluster))
586 )
587 .scalars()
588 .all()
589 )
591 return communities_pb2.ListUserCommunitiesRes(
592 communities=communities_to_pb(session, nodes[:page_size], context),
593 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
594 )
596 def ListAllCommunities(
597 self, request: communities_pb2.ListAllCommunitiesReq, context: CouchersContext, session: Session
598 ) -> communities_pb2.ListAllCommunitiesRes:
599 """List all communities ordered hierarchically by parent-child relationships"""
600 # Get all nodes with their clusters, member counts, and user membership in a single query
601 results = session.execute(
602 select(
603 Node,
604 Cluster,
605 ClusterSubscriptionCount.count,
606 ClusterSubscription.cluster_id.label("user_subscription"),
607 )
608 .join(Cluster, Cluster.parent_node_id == Node.id)
609 .outerjoin(
610 ClusterSubscriptionCount,
611 ClusterSubscriptionCount.cluster_id == Cluster.id,
612 )
613 .outerjoin(
614 ClusterSubscription,
615 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id),
616 )
617 .where(Cluster.is_official_cluster)
618 .order_by(Node.id)
619 ).all()
621 return communities_pb2.ListAllCommunitiesRes(
622 communities=[
623 communities_pb2.CommunitySummary(
624 community_id=node.id,
625 name=cluster.name,
626 slug=cluster.slug,
627 member=user_subscription is not None,
628 member_count=member_count or 1,
629 parents=_parents_to_pb(session, node.id),
630 created=Timestamp_from_datetime(node.created),
631 node_type=nodetype2api[node.node_type],
632 )
633 for node, cluster, member_count, user_subscription in results
634 ],
635 )