Coverage for src/couchers/servicers/communities.py: 83%
192 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 20:25 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 20:25 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD
10from couchers.crypto import decrypt_page_token, encrypt_page_token
11from couchers.db import can_moderate_node, get_node_parents_recursively
12from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount
13from couchers.models import (
14 Cluster,
15 ClusterRole,
16 ClusterSubscription,
17 Discussion,
18 Event,
19 EventOccurrence,
20 Node,
21 Page,
22 PageType,
23 User,
24)
25from couchers.servicers.discussions import discussion_to_pb
26from couchers.servicers.events import event_to_pb
27from couchers.servicers.groups import group_to_pb
28from couchers.servicers.pages import page_to_pb
29from couchers.sql import couchers_select as select
30from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
31from proto import communities_pb2, communities_pb2_grpc, groups_pb2
33logger = logging.getLogger(__name__)
35MAX_PAGINATION_LENGTH = 25
38def _parents_to_pb(session, node_id):
39 parents = get_node_parents_recursively(session, node_id)
40 return [
41 groups_pb2.Parent(
42 community=groups_pb2.CommunityParent(
43 community_id=node_id,
44 name=cluster.name,
45 slug=cluster.slug,
46 description=cluster.description,
47 )
48 )
49 for node_id, parent_node_id, level, cluster in parents
50 ]
53def communities_to_pb(session, nodes: list[Node], context):
54 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
56 official_clusters = [node.official_cluster for node in nodes]
57 official_cluster_ids = [cluster.id for cluster in official_clusters]
59 member_counts = dict(
60 session.execute(
61 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where(
62 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids)
63 )
64 ).all()
65 )
66 cluster_memberships = set(
67 session.execute(
68 select(ClusterSubscription.cluster_id)
69 .where(ClusterSubscription.user_id == context.user_id)
70 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
71 )
72 .scalars()
73 .all()
74 )
76 admin_counts = dict(
77 session.execute(
78 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where(
79 ClusterAdminCount.cluster_id.in_(official_cluster_ids)
80 )
81 ).all()
82 )
83 cluster_adminships = set(
84 session.execute(
85 select(ClusterSubscription.cluster_id)
86 .where(ClusterSubscription.user_id == context.user_id)
87 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
88 .where(ClusterSubscription.role == ClusterRole.admin)
89 )
90 .scalars()
91 .all()
92 )
94 return [
95 communities_pb2.Community(
96 community_id=node.id,
97 name=official_cluster.name,
98 slug=official_cluster.slug,
99 description=official_cluster.description,
100 created=Timestamp_from_datetime(node.created),
101 parents=_parents_to_pb(session, node.id),
102 member=official_cluster.id in cluster_memberships,
103 admin=official_cluster.id in cluster_adminships,
104 member_count=member_counts.get(official_cluster.id, 1),
105 admin_count=admin_counts.get(official_cluster.id, 1),
106 main_page=page_to_pb(session, official_cluster.main_page, context),
107 can_moderate=can_moderate,
108 discussions_enabled=official_cluster.discussions_enabled,
109 events_enabled=official_cluster.events_enabled,
110 )
111 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
112 ]
115def community_to_pb(session, node: Node, context):
116 return communities_to_pb(session, [node], context)[0]
119class Communities(communities_pb2_grpc.CommunitiesServicer):
120 def GetCommunity(self, request, context, session):
121 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
122 if not node:
123 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
125 return community_to_pb(session, node, context)
127 def ListCommunities(self, request, context, session):
128 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
129 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
130 nodes = (
131 session.execute(
132 select(Node)
133 .join(Cluster, Cluster.parent_node_id == Node.id)
134 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
135 .where(Cluster.is_official_cluster)
136 .order_by(Cluster.name)
137 .limit(page_size + 1)
138 .offset(offset)
139 )
140 .scalars()
141 .all()
142 )
143 return communities_pb2.ListCommunitiesRes(
144 communities=communities_to_pb(session, nodes[:page_size], context),
145 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
146 )
148 def SearchCommunities(self, request, context, session):
149 raw_query = request.query.strip()
150 if len(raw_query) < 3:
151 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.QUERY_TOO_SHORT)
153 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
155 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name))
157 query = (
158 select(Node)
159 .join(Cluster, Cluster.parent_node_id == Node.id)
160 .where(Cluster.is_official_cluster)
161 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD)
162 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc())
163 .limit(page_size)
164 )
166 rows = session.execute(query).scalars().all()
168 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context))
170 def ListGroups(self, request, context, session):
171 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
172 next_cluster_id = int(request.page_token) if request.page_token else 0
173 clusters = (
174 session.execute(
175 select(Cluster)
176 .where(~Cluster.is_official_cluster) # not an official group
177 .where(Cluster.parent_node_id == request.community_id)
178 .where(Cluster.id >= next_cluster_id)
179 .order_by(Cluster.id)
180 .limit(page_size + 1)
181 )
182 .scalars()
183 .all()
184 )
185 return communities_pb2.ListGroupsRes(
186 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
187 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
188 )
190 def ListAdmins(self, request, context, session):
191 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
192 next_admin_id = int(request.page_token) if request.page_token else 0
193 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
194 if not node:
195 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
196 admins = (
197 session.execute(
198 select(User)
199 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
200 .where_users_visible(context)
201 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
202 .where(ClusterSubscription.role == ClusterRole.admin)
203 .where(User.id >= next_admin_id)
204 .order_by(User.id)
205 .limit(page_size + 1)
206 )
207 .scalars()
208 .all()
209 )
210 return communities_pb2.ListAdminsRes(
211 admin_user_ids=[admin.id for admin in admins[:page_size]],
212 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
213 )
215 def AddAdmin(self, request, context, session):
216 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
217 if not node:
218 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
219 if not can_moderate_node(session, context.user_id, node.id):
220 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
222 user = session.execute(
223 select(User).where_users_visible(context).where(User.id == request.user_id)
224 ).scalar_one_or_none()
225 if not user:
226 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
228 subscription = session.execute(
229 select(ClusterSubscription)
230 .where(ClusterSubscription.user_id == user.id)
231 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
232 ).scalar_one_or_none()
233 if not subscription:
234 # Can't upgrade a member to admin if they're not already a member
235 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
236 if subscription.role == ClusterRole.admin:
237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN)
239 subscription.role = ClusterRole.admin
241 return empty_pb2.Empty()
243 def RemoveAdmin(self, request, context, session):
244 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
245 if not node:
246 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
247 if not can_moderate_node(session, context.user_id, node.id):
248 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
250 user = session.execute(
251 select(User).where_users_visible(context).where(User.id == request.user_id)
252 ).scalar_one_or_none()
253 if not user:
254 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
256 subscription = session.execute(
257 select(ClusterSubscription)
258 .where(ClusterSubscription.user_id == user.id)
259 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
260 ).scalar_one_or_none()
261 if not subscription:
262 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
263 if subscription.role == ClusterRole.member:
264 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
266 subscription.role = ClusterRole.member
268 return empty_pb2.Empty()
270 def ListMembers(self, request, context, session):
271 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
272 next_member_id = int(request.page_token) if request.page_token else None
274 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
275 if not node:
276 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
278 query = (
279 select(User)
280 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
281 .where_users_visible(context)
282 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
283 )
284 if next_member_id is not None:
285 query = query.where(User.id <= next_member_id)
286 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all()
288 return communities_pb2.ListMembersRes(
289 member_user_ids=[member.id for member in members[:page_size]],
290 next_page_token=str(members[-1].id) if len(members) > page_size else None,
291 )
293 def ListNearbyUsers(self, request, context, session):
294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
295 next_nearby_id = int(request.page_token) if request.page_token else 0
296 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
297 if not node:
298 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
299 nearbys = (
300 session.execute(
301 select(User)
302 .where_users_visible(context)
303 .where(func.ST_Contains(node.geom, User.geom))
304 .where(User.id >= next_nearby_id)
305 .order_by(User.id)
306 .limit(page_size + 1)
307 )
308 .scalars()
309 .all()
310 )
311 return communities_pb2.ListNearbyUsersRes(
312 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
313 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
314 )
316 def ListPlaces(self, request, context, session):
317 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
318 next_page_id = int(request.page_token) if request.page_token else 0
319 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
320 if not node:
321 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
322 places = (
323 node.official_cluster.owned_pages.where(Page.type == PageType.place)
324 .where(Page.id >= next_page_id)
325 .order_by(Page.id)
326 .limit(page_size + 1)
327 .all()
328 )
329 return communities_pb2.ListPlacesRes(
330 places=[page_to_pb(session, page, context) for page in places[:page_size]],
331 next_page_token=str(places[-1].id) if len(places) > page_size else None,
332 )
334 def ListGuides(self, request, context, session):
335 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
336 next_page_id = int(request.page_token) if request.page_token else 0
337 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
338 if not node:
339 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
340 guides = (
341 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
342 .where(Page.id >= next_page_id)
343 .order_by(Page.id)
344 .limit(page_size + 1)
345 .all()
346 )
347 return communities_pb2.ListGuidesRes(
348 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
349 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
350 )
352 def ListEvents(self, request, context, session):
353 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
354 # the page token is a unix timestamp of where we left off
355 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
357 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
358 if not node:
359 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
360 if not node.official_cluster.events_enabled:
361 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED)
363 if not request.include_parents:
364 nodes_clusters_to_search = [(node.id, node.official_cluster)]
365 else:
366 # the first value is the node_id, the last is the cluster (object)
367 nodes_clusters_to_search = [
368 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id)
369 ]
371 membership_clauses = []
372 for node_id, official_cluster_obj in nodes_clusters_to_search:
373 membership_clauses.append(Event.owner_cluster == official_cluster_obj)
374 membership_clauses.append(Event.parent_node_id == node_id)
376 # for communities, we list events owned by this community or for which this is a parent
377 occurrences = (
378 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses))
379 )
381 if request.past:
382 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
383 EventOccurrence.start_time.desc()
384 )
385 else:
386 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
387 EventOccurrence.start_time.asc()
388 )
390 occurrences = occurrences.limit(page_size + 1)
391 occurrences = session.execute(occurrences).scalars().all()
393 return communities_pb2.ListEventsRes(
394 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
395 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
396 )
398 def ListDiscussions(self, request, context, session):
399 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
400 next_page_id = int(request.page_token) if request.page_token else 0
401 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
402 if not node:
403 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
404 if not node.official_cluster.discussions_enabled:
405 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.DISCUSSIONS_NOT_ENABLED)
406 discussions = (
407 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
408 .order_by(Discussion.id.desc())
409 .limit(page_size + 1)
410 .all()
411 )
412 return communities_pb2.ListDiscussionsRes(
413 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
414 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
415 )
417 def JoinCommunity(self, request, context, session):
418 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
419 if not node:
420 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
422 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
423 if current_membership:
424 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
426 node.official_cluster.cluster_subscriptions.append(
427 ClusterSubscription(
428 user_id=context.user_id,
429 role=ClusterRole.member,
430 )
431 )
433 return empty_pb2.Empty()
435 def LeaveCommunity(self, request, context, session):
436 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
437 if not node:
438 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
440 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
442 if not current_membership:
443 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
445 if context.user_id in node.contained_user_ids:
446 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
448 session.execute(
449 delete(ClusterSubscription)
450 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
451 .where(ClusterSubscription.user_id == context.user_id)
452 )
454 return empty_pb2.Empty()
456 def ListUserCommunities(self, request, context, session):
457 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
458 next_node_id = int(request.page_token) if request.page_token else 0
459 user_id = request.user_id or context.user_id
460 nodes = (
461 session.execute(
462 select(Node)
463 .join(Cluster, Cluster.parent_node_id == Node.id)
464 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
465 .where(ClusterSubscription.user_id == user_id)
466 .where(Cluster.is_official_cluster)
467 .where(Node.id >= next_node_id)
468 .order_by(Node.id)
469 .limit(page_size + 1)
470 )
471 .scalars()
472 .all()
473 )
475 return communities_pb2.ListUserCommunitiesRes(
476 communities=communities_to_pb(session, nodes[:page_size], context),
477 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
478 )