Coverage for src/couchers/servicers/communities.py: 84%
194 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD
9from couchers.crypto import decrypt_page_token, encrypt_page_token
10from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography
11from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount
12from couchers.models import (
13 Cluster,
14 ClusterRole,
15 ClusterSubscription,
16 Discussion,
17 Event,
18 EventOccurrence,
19 Node,
20 Page,
21 PageType,
22 User,
23)
24from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2
25from couchers.servicers.discussions import discussion_to_pb
26from couchers.servicers.events import event_to_pb
27from couchers.servicers.groups import group_to_pb
28from couchers.servicers.pages import page_to_pb
29from couchers.sql import couchers_select as select
30from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
32logger = logging.getLogger(__name__)
34MAX_PAGINATION_LENGTH = 25
37def _parents_to_pb(session, node_id):
38 parents = get_node_parents_recursively(session, node_id)
39 return [
40 groups_pb2.Parent(
41 community=groups_pb2.CommunityParent(
42 community_id=node_id,
43 name=cluster.name,
44 slug=cluster.slug,
45 description=cluster.description,
46 )
47 )
48 for node_id, parent_node_id, level, cluster in parents
49 ]
52def communities_to_pb(session, nodes: list[Node], context):
53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
55 official_clusters = [node.official_cluster for node in nodes]
56 official_cluster_ids = [cluster.id for cluster in official_clusters]
58 member_counts = dict(
59 session.execute(
60 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where(
61 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids)
62 )
63 ).all()
64 )
65 cluster_memberships = set(
66 session.execute(
67 select(ClusterSubscription.cluster_id)
68 .where(ClusterSubscription.user_id == context.user_id)
69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
70 )
71 .scalars()
72 .all()
73 )
75 admin_counts = dict(
76 session.execute(
77 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where(
78 ClusterAdminCount.cluster_id.in_(official_cluster_ids)
79 )
80 ).all()
81 )
82 cluster_adminships = set(
83 session.execute(
84 select(ClusterSubscription.cluster_id)
85 .where(ClusterSubscription.user_id == context.user_id)
86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
87 .where(ClusterSubscription.role == ClusterRole.admin)
88 )
89 .scalars()
90 .all()
91 )
93 return [
94 communities_pb2.Community(
95 community_id=node.id,
96 name=official_cluster.name,
97 slug=official_cluster.slug,
98 description=official_cluster.description,
99 created=Timestamp_from_datetime(node.created),
100 parents=_parents_to_pb(session, node.id),
101 member=official_cluster.id in cluster_memberships,
102 admin=official_cluster.id in cluster_adminships,
103 member_count=member_counts.get(official_cluster.id, 1),
104 admin_count=admin_counts.get(official_cluster.id, 1),
105 main_page=page_to_pb(session, official_cluster.main_page, context),
106 can_moderate=can_moderate,
107 discussions_enabled=official_cluster.discussions_enabled,
108 events_enabled=official_cluster.events_enabled,
109 )
110 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
111 ]
114def community_to_pb(session, node: Node, context):
115 return communities_to_pb(session, [node], context)[0]
118class Communities(communities_pb2_grpc.CommunitiesServicer):
119 def GetCommunity(self, request, context, session):
120 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
121 if not node:
122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
124 return community_to_pb(session, node, context)
126 def ListCommunities(self, request, context, session):
127 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
128 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
129 nodes = (
130 session.execute(
131 select(Node)
132 .join(Cluster, Cluster.parent_node_id == Node.id)
133 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
134 .where(Cluster.is_official_cluster)
135 .order_by(Cluster.name)
136 .limit(page_size + 1)
137 .offset(offset)
138 )
139 .scalars()
140 .all()
141 )
142 return communities_pb2.ListCommunitiesRes(
143 communities=communities_to_pb(session, nodes[:page_size], context),
144 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
145 )
147 def SearchCommunities(self, request, context, session):
148 raw_query = request.query.strip()
149 if len(raw_query) < 3:
150 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short")
152 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
154 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name))
156 query = (
157 select(Node)
158 .join(Cluster, Cluster.parent_node_id == Node.id)
159 .where(Cluster.is_official_cluster)
160 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD)
161 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc())
162 .limit(page_size)
163 )
165 rows = session.execute(query).scalars().all()
167 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context))
169 def ListGroups(self, request, context, session):
170 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
171 next_cluster_id = int(request.page_token) if request.page_token else 0
172 clusters = (
173 session.execute(
174 select(Cluster)
175 .where(~Cluster.is_official_cluster) # not an official group
176 .where(Cluster.parent_node_id == request.community_id)
177 .where(Cluster.id >= next_cluster_id)
178 .order_by(Cluster.id)
179 .limit(page_size + 1)
180 )
181 .scalars()
182 .all()
183 )
184 return communities_pb2.ListGroupsRes(
185 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
186 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
187 )
189 def ListAdmins(self, request, context, session):
190 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
191 next_admin_id = int(request.page_token) if request.page_token else 0
192 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
193 if not node:
194 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
195 admins = (
196 session.execute(
197 select(User)
198 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
199 .where_users_visible(context)
200 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
201 .where(ClusterSubscription.role == ClusterRole.admin)
202 .where(User.id >= next_admin_id)
203 .order_by(User.id)
204 .limit(page_size + 1)
205 )
206 .scalars()
207 .all()
208 )
209 return communities_pb2.ListAdminsRes(
210 admin_user_ids=[admin.id for admin in admins[:page_size]],
211 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
212 )
214 def AddAdmin(self, request, context, session):
215 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
216 if not node:
217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
218 if not can_moderate_node(session, context.user_id, node.id):
219 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied")
221 user = session.execute(
222 select(User).where_users_visible(context).where(User.id == request.user_id)
223 ).scalar_one_or_none()
224 if not user:
225 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
227 subscription = session.execute(
228 select(ClusterSubscription)
229 .where(ClusterSubscription.user_id == user.id)
230 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
231 ).scalar_one_or_none()
232 if not subscription:
233 # Can't upgrade a member to admin if they're not already a member
234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member")
235 if subscription.role == ClusterRole.admin:
236 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin")
238 subscription.role = ClusterRole.admin
240 return empty_pb2.Empty()
242 def RemoveAdmin(self, request, context, session):
243 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
244 if not node:
245 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
246 if not can_moderate_node(session, context.user_id, node.id):
247 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied")
249 user = session.execute(
250 select(User).where_users_visible(context).where(User.id == request.user_id)
251 ).scalar_one_or_none()
252 if not user:
253 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
255 subscription = session.execute(
256 select(ClusterSubscription)
257 .where(ClusterSubscription.user_id == user.id)
258 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
259 ).scalar_one_or_none()
260 if not subscription:
261 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member")
262 if subscription.role == ClusterRole.member:
263 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin")
265 subscription.role = ClusterRole.member
267 return empty_pb2.Empty()
269 def ListMembers(self, request, context, session):
270 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
271 next_member_id = int(request.page_token) if request.page_token else None
273 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
274 if not node:
275 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
277 query = (
278 select(User)
279 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
280 .where_users_visible(context)
281 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
282 )
283 if next_member_id is not None:
284 query = query.where(User.id <= next_member_id)
285 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all()
287 return communities_pb2.ListMembersRes(
288 member_user_ids=[member.id for member in members[:page_size]],
289 next_page_token=str(members[-1].id) if len(members) > page_size else None,
290 )
292 def ListNearbyUsers(self, request, context, session):
293 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
294 next_nearby_id = int(request.page_token) if request.page_token else 0
295 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
296 if not node:
297 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
298 nearbys = (
299 session.execute(
300 select(User)
301 .where_users_visible(context)
302 .where(func.ST_Contains(node.geom, User.geom))
303 .where(User.id >= next_nearby_id)
304 .order_by(User.id)
305 .limit(page_size + 1)
306 )
307 .scalars()
308 .all()
309 )
310 return communities_pb2.ListNearbyUsersRes(
311 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
312 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
313 )
315 def ListPlaces(self, request, context, session):
316 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
317 next_page_id = int(request.page_token) if request.page_token else 0
318 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
319 if not node:
320 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
321 places = (
322 node.official_cluster.owned_pages.where(Page.type == PageType.place)
323 .where(Page.id >= next_page_id)
324 .order_by(Page.id)
325 .limit(page_size + 1)
326 .all()
327 )
328 return communities_pb2.ListPlacesRes(
329 places=[page_to_pb(session, page, context) for page in places[:page_size]],
330 next_page_token=str(places[-1].id) if len(places) > page_size else None,
331 )
333 def ListGuides(self, request, context, session):
334 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
335 next_page_id = int(request.page_token) if request.page_token else 0
336 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
337 if not node:
338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
339 guides = (
340 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
341 .where(Page.id >= next_page_id)
342 .order_by(Page.id)
343 .limit(page_size + 1)
344 .all()
345 )
346 return communities_pb2.ListGuidesRes(
347 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
348 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
349 )
351 def ListEvents(self, request, context, session):
352 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
353 # the page token is a unix timestamp of where we left off
354 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
356 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
357 if not node:
358 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
359 if not node.official_cluster.events_enabled:
360 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled")
362 if not request.include_parents:
363 nodes_clusters_to_search = [(node.id, node.official_cluster)]
364 else:
365 # the first value is the node_id, the last is the cluster (object)
366 nodes_clusters_to_search = [
367 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id)
368 ]
370 membership_clauses = []
371 for node_id, official_cluster_obj in nodes_clusters_to_search:
372 membership_clauses.append(Event.owner_cluster == official_cluster_obj)
373 membership_clauses.append(Event.parent_node_id == node_id)
375 # for communities, we list events owned by this community or for which this is a parent
376 occurrences = (
377 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses))
378 )
380 if request.past:
381 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
382 EventOccurrence.start_time.desc()
383 )
384 else:
385 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
386 EventOccurrence.start_time.asc()
387 )
389 occurrences = occurrences.limit(page_size + 1)
390 occurrences = session.execute(occurrences).scalars().all()
392 return communities_pb2.ListEventsRes(
393 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
394 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
395 )
397 def ListDiscussions(self, request, context, session):
398 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
399 next_page_id = int(request.page_token) if request.page_token else 0
400 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
401 if not node:
402 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
403 if not node.official_cluster.discussions_enabled:
404 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled")
405 discussions = (
406 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
407 .order_by(Discussion.id.desc())
408 .limit(page_size + 1)
409 .all()
410 )
411 return communities_pb2.ListDiscussionsRes(
412 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
413 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
414 )
416 def JoinCommunity(self, request, context, session):
417 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
418 if not node:
419 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
421 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
422 if current_membership:
423 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community")
425 node.official_cluster.cluster_subscriptions.append(
426 ClusterSubscription(
427 user_id=context.user_id,
428 role=ClusterRole.member,
429 )
430 )
432 return empty_pb2.Empty()
434 def LeaveCommunity(self, request, context, session):
435 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
436 if not node:
437 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
439 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
441 if not current_membership:
442 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community")
444 if is_user_in_node_geography(session, context.user_id, node.id):
445 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community")
447 session.execute(
448 delete(ClusterSubscription)
449 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
450 .where(ClusterSubscription.user_id == context.user_id)
451 )
453 return empty_pb2.Empty()
455 def ListUserCommunities(self, request, context, session):
456 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
457 next_node_id = int(request.page_token) if request.page_token else 0
458 user_id = request.user_id or context.user_id
459 nodes = (
460 session.execute(
461 select(Node)
462 .join(Cluster, Cluster.parent_node_id == Node.id)
463 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
464 .where(ClusterSubscription.user_id == user_id)
465 .where(Cluster.is_official_cluster)
466 .where(Node.id >= next_node_id)
467 .order_by(Node.id)
468 .limit(page_size + 1)
469 )
470 .scalars()
471 .all()
472 )
474 return communities_pb2.ListUserCommunitiesRes(
475 communities=communities_to_pb(session, nodes[:page_size], context),
476 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
477 )
479 def ListAllCommunities(self, request, context, session):
480 """List all communities ordered hierarchically by parent-child relationships"""
481 # Get all nodes with their clusters, member counts, and user membership in a single query
482 results = session.execute(
483 select(
484 Node,
485 Cluster,
486 ClusterSubscriptionCount.count,
487 ClusterSubscription.cluster_id.label("user_subscription"),
488 )
489 .join(Cluster, Cluster.parent_node_id == Node.id)
490 .outerjoin(
491 ClusterSubscriptionCount,
492 ClusterSubscriptionCount.cluster_id == Cluster.id,
493 )
494 .outerjoin(
495 ClusterSubscription,
496 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id),
497 )
498 .where(Cluster.is_official_cluster)
499 .order_by(Node.id)
500 ).all()
502 return communities_pb2.ListAllCommunitiesRes(
503 communities=[
504 communities_pb2.CommunitySummary(
505 community_id=node.id,
506 name=cluster.name,
507 slug=cluster.slug,
508 member=user_subscription is not None,
509 member_count=member_count or 1,
510 parents=_parents_to_pb(session, node.id),
511 created=Timestamp_from_datetime(node.created),
512 )
513 for node, cluster, member_count, user_subscription in results
514 ],
515 )