Coverage for src/couchers/servicers/communities.py: 82%
182 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-09-14 15:31 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.crypto import decrypt_page_token, encrypt_page_token
10from couchers.db import can_moderate_node, get_node_parents_recursively
11from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount
12from couchers.models import (
13 Cluster,
14 ClusterRole,
15 ClusterSubscription,
16 Discussion,
17 Event,
18 EventOccurrence,
19 Node,
20 Page,
21 PageType,
22 User,
23)
24from couchers.servicers.discussions import discussion_to_pb
25from couchers.servicers.events import event_to_pb
26from couchers.servicers.groups import group_to_pb
27from couchers.servicers.pages import page_to_pb
28from couchers.sql import couchers_select as select
29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
30from proto import communities_pb2, communities_pb2_grpc, groups_pb2
32logger = logging.getLogger(__name__)
34MAX_PAGINATION_LENGTH = 25
37def _parents_to_pb(session, node_id):
38 parents = get_node_parents_recursively(session, node_id)
39 return [
40 groups_pb2.Parent(
41 community=groups_pb2.CommunityParent(
42 community_id=node_id,
43 name=cluster.name,
44 slug=cluster.slug,
45 description=cluster.description,
46 )
47 )
48 for node_id, parent_node_id, level, cluster in parents
49 ]
52def communities_to_pb(session, nodes: list[Node], context):
53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
55 official_clusters = [node.official_cluster for node in nodes]
56 official_cluster_ids = [cluster.id for cluster in official_clusters]
58 member_counts = dict(
59 session.execute(
60 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where(
61 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids)
62 )
63 ).all()
64 )
65 cluster_memberships = set(
66 session.execute(
67 select(ClusterSubscription.cluster_id)
68 .where(ClusterSubscription.user_id == context.user_id)
69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
70 )
71 .scalars()
72 .all()
73 )
75 admin_counts = dict(
76 session.execute(
77 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where(
78 ClusterAdminCount.cluster_id.in_(official_cluster_ids)
79 )
80 ).all()
81 )
82 cluster_adminships = set(
83 session.execute(
84 select(ClusterSubscription.cluster_id)
85 .where(ClusterSubscription.user_id == context.user_id)
86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
87 .where(ClusterSubscription.role == ClusterRole.admin)
88 )
89 .scalars()
90 .all()
91 )
93 return [
94 communities_pb2.Community(
95 community_id=node.id,
96 name=official_cluster.name,
97 slug=official_cluster.slug,
98 description=official_cluster.description,
99 created=Timestamp_from_datetime(node.created),
100 parents=_parents_to_pb(session, node.id),
101 member=official_cluster.id in cluster_memberships,
102 admin=official_cluster.id in cluster_adminships,
103 member_count=member_counts.get(official_cluster.id, 1),
104 admin_count=admin_counts.get(official_cluster.id, 1),
105 main_page=page_to_pb(session, official_cluster.main_page, context),
106 can_moderate=can_moderate,
107 discussions_enabled=official_cluster.discussions_enabled,
108 events_enabled=official_cluster.events_enabled,
109 )
110 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
111 ]
114def community_to_pb(session, node: Node, context):
115 return communities_to_pb(session, [node], context)[0]
118class Communities(communities_pb2_grpc.CommunitiesServicer):
119 def GetCommunity(self, request, context, session):
120 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
121 if not node:
122 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
124 return community_to_pb(session, node, context)
126 def ListCommunities(self, request, context, session):
127 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
128 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
129 nodes = (
130 session.execute(
131 select(Node)
132 .join(Cluster, Cluster.parent_node_id == Node.id)
133 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
134 .where(Cluster.is_official_cluster)
135 .order_by(Cluster.name)
136 .limit(page_size + 1)
137 .offset(offset)
138 )
139 .scalars()
140 .all()
141 )
142 return communities_pb2.ListCommunitiesRes(
143 communities=communities_to_pb(session, nodes[:page_size], context),
144 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
145 )
147 def ListGroups(self, request, context, session):
148 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
149 next_cluster_id = int(request.page_token) if request.page_token else 0
150 clusters = (
151 session.execute(
152 select(Cluster)
153 .where(~Cluster.is_official_cluster) # not an official group
154 .where(Cluster.parent_node_id == request.community_id)
155 .where(Cluster.id >= next_cluster_id)
156 .order_by(Cluster.id)
157 .limit(page_size + 1)
158 )
159 .scalars()
160 .all()
161 )
162 return communities_pb2.ListGroupsRes(
163 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
164 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
165 )
167 def ListAdmins(self, request, context, session):
168 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
169 next_admin_id = int(request.page_token) if request.page_token else 0
170 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
171 if not node:
172 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
173 admins = (
174 session.execute(
175 select(User)
176 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
177 .where_users_visible(context)
178 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
179 .where(ClusterSubscription.role == ClusterRole.admin)
180 .where(User.id >= next_admin_id)
181 .order_by(User.id)
182 .limit(page_size + 1)
183 )
184 .scalars()
185 .all()
186 )
187 return communities_pb2.ListAdminsRes(
188 admin_user_ids=[admin.id for admin in admins[:page_size]],
189 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
190 )
192 def AddAdmin(self, request, context, session):
193 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
194 if not node:
195 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
196 if not can_moderate_node(session, context.user_id, node.id):
197 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
199 user = session.execute(
200 select(User).where_users_visible(context).where(User.id == request.user_id)
201 ).scalar_one_or_none()
202 if not user:
203 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
205 subscription = session.execute(
206 select(ClusterSubscription)
207 .where(ClusterSubscription.user_id == user.id)
208 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
209 ).scalar_one_or_none()
210 if not subscription:
211 # Can't upgrade a member to admin if they're not already a member
212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
213 if subscription.role == ClusterRole.admin:
214 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN)
216 subscription.role = ClusterRole.admin
218 return empty_pb2.Empty()
220 def RemoveAdmin(self, request, context, session):
221 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
222 if not node:
223 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
224 if not can_moderate_node(session, context.user_id, node.id):
225 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
227 user = session.execute(
228 select(User).where_users_visible(context).where(User.id == request.user_id)
229 ).scalar_one_or_none()
230 if not user:
231 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
233 subscription = session.execute(
234 select(ClusterSubscription)
235 .where(ClusterSubscription.user_id == user.id)
236 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
237 ).scalar_one_or_none()
238 if not subscription:
239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
240 if subscription.role == ClusterRole.member:
241 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
243 subscription.role = ClusterRole.member
245 return empty_pb2.Empty()
247 def ListMembers(self, request, context, session):
248 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
249 next_member_id = int(request.page_token) if request.page_token else None
251 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
252 if not node:
253 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
255 query = (
256 select(User)
257 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
258 .where_users_visible(context)
259 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
260 )
261 if next_member_id is not None:
262 query = query.where(User.id <= next_member_id)
263 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all()
265 return communities_pb2.ListMembersRes(
266 member_user_ids=[member.id for member in members[:page_size]],
267 next_page_token=str(members[-1].id) if len(members) > page_size else None,
268 )
270 def ListNearbyUsers(self, request, context, session):
271 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
272 next_nearby_id = int(request.page_token) if request.page_token else 0
273 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
274 if not node:
275 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
276 nearbys = (
277 session.execute(
278 select(User)
279 .where_users_visible(context)
280 .where(func.ST_Contains(node.geom, User.geom))
281 .where(User.id >= next_nearby_id)
282 .order_by(User.id)
283 .limit(page_size + 1)
284 )
285 .scalars()
286 .all()
287 )
288 return communities_pb2.ListNearbyUsersRes(
289 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
290 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
291 )
293 def ListPlaces(self, request, context, session):
294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
295 next_page_id = int(request.page_token) if request.page_token else 0
296 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
297 if not node:
298 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
299 places = (
300 node.official_cluster.owned_pages.where(Page.type == PageType.place)
301 .where(Page.id >= next_page_id)
302 .order_by(Page.id)
303 .limit(page_size + 1)
304 .all()
305 )
306 return communities_pb2.ListPlacesRes(
307 places=[page_to_pb(session, page, context) for page in places[:page_size]],
308 next_page_token=str(places[-1].id) if len(places) > page_size else None,
309 )
311 def ListGuides(self, request, context, session):
312 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
313 next_page_id = int(request.page_token) if request.page_token else 0
314 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
315 if not node:
316 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
317 guides = (
318 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
319 .where(Page.id >= next_page_id)
320 .order_by(Page.id)
321 .limit(page_size + 1)
322 .all()
323 )
324 return communities_pb2.ListGuidesRes(
325 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
326 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
327 )
329 def ListEvents(self, request, context, session):
330 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
331 # the page token is a unix timestamp of where we left off
332 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
334 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
335 if not node:
336 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
337 if not node.official_cluster.events_enabled:
338 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED)
340 if not request.include_parents:
341 nodes_clusters_to_search = [(node.id, node.official_cluster)]
342 else:
343 # the first value is the node_id, the last is the cluster (object)
344 nodes_clusters_to_search = [
345 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id)
346 ]
348 membership_clauses = []
349 for node_id, official_cluster_obj in nodes_clusters_to_search:
350 membership_clauses.append(Event.owner_cluster == official_cluster_obj)
351 membership_clauses.append(Event.parent_node_id == node_id)
353 # for communities, we list events owned by this community or for which this is a parent
354 occurrences = (
355 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses))
356 )
358 if request.past:
359 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
360 EventOccurrence.start_time.desc()
361 )
362 else:
363 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
364 EventOccurrence.start_time.asc()
365 )
367 occurrences = occurrences.limit(page_size + 1)
368 occurrences = session.execute(occurrences).scalars().all()
370 return communities_pb2.ListEventsRes(
371 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
372 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
373 )
375 def ListDiscussions(self, request, context, session):
376 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
377 next_page_id = int(request.page_token) if request.page_token else 0
378 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
379 if not node:
380 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
381 if not node.official_cluster.discussions_enabled:
382 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.DISCUSSIONS_NOT_ENABLED)
383 discussions = (
384 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
385 .order_by(Discussion.id.desc())
386 .limit(page_size + 1)
387 .all()
388 )
389 return communities_pb2.ListDiscussionsRes(
390 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
391 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
392 )
394 def JoinCommunity(self, request, context, session):
395 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
396 if not node:
397 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
399 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
400 if current_membership:
401 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
403 node.official_cluster.cluster_subscriptions.append(
404 ClusterSubscription(
405 user_id=context.user_id,
406 role=ClusterRole.member,
407 )
408 )
410 return empty_pb2.Empty()
412 def LeaveCommunity(self, request, context, session):
413 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
414 if not node:
415 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
417 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
419 if not current_membership:
420 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
422 if context.user_id in node.contained_user_ids:
423 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
425 session.execute(
426 delete(ClusterSubscription)
427 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
428 .where(ClusterSubscription.user_id == context.user_id)
429 )
431 return empty_pb2.Empty()
433 def ListUserCommunities(self, request, context, session):
434 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
435 next_node_id = int(request.page_token) if request.page_token else 0
436 user_id = request.user_id or context.user_id
437 nodes = (
438 session.execute(
439 select(Node)
440 .join(Cluster, Cluster.parent_node_id == Node.id)
441 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
442 .where(ClusterSubscription.user_id == user_id)
443 .where(Cluster.is_official_cluster)
444 .where(Node.id >= next_node_id)
445 .order_by(Node.id)
446 .limit(page_size + 1)
447 )
448 .scalars()
449 .all()
450 )
452 return communities_pb2.ListUserCommunitiesRes(
453 communities=communities_to_pb(session, nodes[:page_size], context),
454 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
455 )