Coverage for src/couchers/servicers/communities.py: 83%
171 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-09 00:05 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-09 00:05 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.crypto import decrypt_page_token, encrypt_page_token
10from couchers.db import can_moderate_node, get_node_parents_recursively
11from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts
12from couchers.models import (
13 Cluster,
14 ClusterRole,
15 ClusterSubscription,
16 Discussion,
17 Event,
18 EventOccurrence,
19 Node,
20 Page,
21 PageType,
22 User,
23)
24from couchers.servicers.discussions import discussion_to_pb
25from couchers.servicers.events import event_to_pb
26from couchers.servicers.groups import group_to_pb
27from couchers.servicers.pages import page_to_pb
28from couchers.sql import couchers_select as select
29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
30from proto import communities_pb2, communities_pb2_grpc, groups_pb2
32logger = logging.getLogger(__name__)
34MAX_PAGINATION_LENGTH = 25
37def _parents_to_pb(session, node_id):
38 parents = get_node_parents_recursively(session, node_id)
39 return [
40 groups_pb2.Parent(
41 community=groups_pb2.CommunityParent(
42 community_id=node_id,
43 name=cluster.name,
44 slug=cluster.slug,
45 description=cluster.description,
46 )
47 )
48 for node_id, parent_node_id, level, cluster in parents
49 ]
52def communities_to_pb(session, nodes: list[Node], context):
53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
55 official_clusters = [node.official_cluster for node in nodes]
56 official_cluster_ids = [cluster.id for cluster in official_clusters]
58 member_counts = dict(
59 session.execute(
60 select(cluster_subscription_counts.c.cluster_id, cluster_subscription_counts.c.count).where(
61 cluster_subscription_counts.c.cluster_id.in_(official_cluster_ids)
62 )
63 ).all()
64 )
65 cluster_memberships = set(
66 session.execute(
67 select(ClusterSubscription.cluster_id)
68 .where(ClusterSubscription.user_id == context.user_id)
69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
70 )
71 .scalars()
72 .all()
73 )
75 admin_counts = dict(
76 session.execute(
77 select(cluster_admin_counts.c.cluster_id, cluster_admin_counts.c.count).where(
78 cluster_admin_counts.c.cluster_id.in_(official_cluster_ids)
79 )
80 ).all()
81 )
82 cluster_adminships = set(
83 session.execute(
84 select(ClusterSubscription.cluster_id)
85 .where(ClusterSubscription.user_id == context.user_id)
86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
87 .where(ClusterSubscription.role == ClusterRole.admin)
88 )
89 .scalars()
90 .all()
91 )
93 return [
94 communities_pb2.Community(
95 community_id=node.id,
96 name=official_cluster.name,
97 slug=official_cluster.slug,
98 description=official_cluster.description,
99 created=Timestamp_from_datetime(node.created),
100 parents=_parents_to_pb(session, node.id),
101 member=official_cluster.id in cluster_memberships,
102 admin=official_cluster.id in cluster_adminships,
103 member_count=member_counts.get(official_cluster.id, 1),
104 admin_count=admin_counts.get(official_cluster.id, 1),
105 main_page=page_to_pb(session, official_cluster.main_page, context),
106 can_moderate=can_moderate,
107 )
108 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
109 ]
112def community_to_pb(session, node: Node, context):
113 return communities_to_pb(session, [node], context)[0]
116class Communities(communities_pb2_grpc.CommunitiesServicer):
117 def GetCommunity(self, request, context, session):
118 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
119 if not node:
120 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
122 return community_to_pb(session, node, context)
124 def ListCommunities(self, request, context, session):
125 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
126 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
127 nodes = (
128 session.execute(
129 select(Node)
130 .join(Cluster, Cluster.parent_node_id == Node.id)
131 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
132 .where(Cluster.is_official_cluster)
133 .order_by(Cluster.name)
134 .limit(page_size + 1)
135 .offset(offset)
136 )
137 .scalars()
138 .all()
139 )
140 return communities_pb2.ListCommunitiesRes(
141 communities=communities_to_pb(session, nodes[:page_size], context),
142 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
143 )
145 def ListGroups(self, request, context, session):
146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
147 next_cluster_id = int(request.page_token) if request.page_token else 0
148 clusters = (
149 session.execute(
150 select(Cluster)
151 .where(~Cluster.is_official_cluster) # not an official group
152 .where(Cluster.parent_node_id == request.community_id)
153 .where(Cluster.id >= next_cluster_id)
154 .order_by(Cluster.id)
155 .limit(page_size + 1)
156 )
157 .scalars()
158 .all()
159 )
160 return communities_pb2.ListGroupsRes(
161 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
162 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
163 )
165 def ListAdmins(self, request, context, session):
166 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
167 next_admin_id = int(request.page_token) if request.page_token else 0
168 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
169 if not node:
170 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
171 admins = (
172 session.execute(
173 select(User)
174 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
175 .where_users_visible(context)
176 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
177 .where(ClusterSubscription.role == ClusterRole.admin)
178 .where(User.id >= next_admin_id)
179 .order_by(User.id)
180 .limit(page_size + 1)
181 )
182 .scalars()
183 .all()
184 )
185 return communities_pb2.ListAdminsRes(
186 admin_user_ids=[admin.id for admin in admins[:page_size]],
187 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
188 )
190 def AddAdmin(self, request, context, session):
191 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
192 if not node:
193 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
194 if not can_moderate_node(session, context.user_id, node.id):
195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
197 user = session.execute(
198 select(User).where_users_visible(context).where(User.id == request.user_id)
199 ).scalar_one_or_none()
200 if not user:
201 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
203 subscription = session.execute(
204 select(ClusterSubscription)
205 .where(ClusterSubscription.user_id == user.id)
206 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
207 ).scalar_one_or_none()
208 if not subscription:
209 # Can't upgrade a member to admin if they're not already a member
210 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
211 if subscription.role == ClusterRole.admin:
212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN)
214 subscription.role = ClusterRole.admin
216 return empty_pb2.Empty()
218 def RemoveAdmin(self, request, context, session):
219 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
220 if not node:
221 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
222 if not can_moderate_node(session, context.user_id, node.id):
223 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
225 user = session.execute(
226 select(User).where_users_visible(context).where(User.id == request.user_id)
227 ).scalar_one_or_none()
228 if not user:
229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
231 subscription = session.execute(
232 select(ClusterSubscription)
233 .where(ClusterSubscription.user_id == user.id)
234 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
235 ).scalar_one_or_none()
236 if not subscription:
237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
238 if subscription.role == ClusterRole.member:
239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
241 subscription.role = ClusterRole.member
243 return empty_pb2.Empty()
245 def ListMembers(self, request, context, session):
246 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
247 next_member_id = int(request.page_token) if request.page_token else None
249 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
250 if not node:
251 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
253 query = (
254 select(User)
255 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
256 .where_users_visible(context)
257 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
258 )
259 if next_member_id is not None:
260 query = query.where(User.id <= next_member_id)
261 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all()
263 return communities_pb2.ListMembersRes(
264 member_user_ids=[member.id for member in members[:page_size]],
265 next_page_token=str(members[-1].id) if len(members) > page_size else None,
266 )
268 def ListNearbyUsers(self, request, context, session):
269 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
270 next_nearby_id = int(request.page_token) if request.page_token else 0
271 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
272 if not node:
273 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
274 nearbys = (
275 session.execute(
276 select(User)
277 .where_users_visible(context)
278 .where(func.ST_Contains(node.geom, User.geom))
279 .where(User.id >= next_nearby_id)
280 .order_by(User.id)
281 .limit(page_size + 1)
282 )
283 .scalars()
284 .all()
285 )
286 return communities_pb2.ListNearbyUsersRes(
287 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
288 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
289 )
291 def ListPlaces(self, request, context, session):
292 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
293 next_page_id = int(request.page_token) if request.page_token else 0
294 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
295 if not node:
296 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
297 places = (
298 node.official_cluster.owned_pages.where(Page.type == PageType.place)
299 .where(Page.id >= next_page_id)
300 .order_by(Page.id)
301 .limit(page_size + 1)
302 .all()
303 )
304 return communities_pb2.ListPlacesRes(
305 places=[page_to_pb(session, page, context) for page in places[:page_size]],
306 next_page_token=str(places[-1].id) if len(places) > page_size else None,
307 )
309 def ListGuides(self, request, context, session):
310 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
311 next_page_id = int(request.page_token) if request.page_token else 0
312 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
313 if not node:
314 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
315 guides = (
316 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
317 .where(Page.id >= next_page_id)
318 .order_by(Page.id)
319 .limit(page_size + 1)
320 .all()
321 )
322 return communities_pb2.ListGuidesRes(
323 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
324 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
325 )
327 def ListEvents(self, request, context, session):
328 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
329 # the page token is a unix timestamp of where we left off
330 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
332 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
333 if not node:
334 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
336 # for communities, we list events owned by this community or for which this is a parent
337 occurrences = (
338 select(EventOccurrence)
339 .join(Event, Event.id == EventOccurrence.event_id)
340 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node))
341 )
343 if request.past:
344 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
345 EventOccurrence.start_time.desc()
346 )
347 else:
348 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
349 EventOccurrence.start_time.asc()
350 )
352 occurrences = occurrences.limit(page_size + 1)
353 occurrences = session.execute(occurrences).scalars().all()
355 return communities_pb2.ListEventsRes(
356 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
357 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
358 )
360 def ListDiscussions(self, request, context, session):
361 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
362 next_page_id = int(request.page_token) if request.page_token else 0
363 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
364 if not node:
365 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
366 discussions = (
367 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
368 .order_by(Discussion.id.desc())
369 .limit(page_size + 1)
370 .all()
371 )
372 return communities_pb2.ListDiscussionsRes(
373 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
374 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
375 )
377 def JoinCommunity(self, request, context, session):
378 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
379 if not node:
380 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
382 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
383 if current_membership:
384 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
386 node.official_cluster.cluster_subscriptions.append(
387 ClusterSubscription(
388 user_id=context.user_id,
389 role=ClusterRole.member,
390 )
391 )
393 return empty_pb2.Empty()
395 def LeaveCommunity(self, request, context, session):
396 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
397 if not node:
398 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
400 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
402 if not current_membership:
403 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
405 if context.user_id in node.contained_user_ids:
406 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
408 session.execute(
409 delete(ClusterSubscription)
410 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
411 .where(ClusterSubscription.user_id == context.user_id)
412 )
414 return empty_pb2.Empty()
416 def ListUserCommunities(self, request, context, session):
417 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
418 next_node_id = int(request.page_token) if request.page_token else 0
419 user_id = request.user_id or context.user_id
420 nodes = (
421 session.execute(
422 select(Node)
423 .join(Cluster, Cluster.parent_node_id == Node.id)
424 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
425 .where(ClusterSubscription.user_id == user_id)
426 .where(Cluster.is_official_cluster)
427 .where(Node.id >= next_node_id)
428 .order_by(Node.id)
429 .limit(page_size + 1)
430 )
431 .scalars()
432 .all()
433 )
435 return communities_pb2.ListUserCommunitiesRes(
436 communities=communities_to_pb(session, nodes[:page_size], context),
437 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
438 )