Coverage for src/couchers/servicers/communities.py: 83%
168 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 02:39 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 02:39 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.crypto import decrypt_page_token, encrypt_page_token
10from couchers.db import can_moderate_node, get_node_parents_recursively
11from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts
12from couchers.models import (
13 Cluster,
14 ClusterRole,
15 ClusterSubscription,
16 Discussion,
17 Event,
18 EventOccurrence,
19 Node,
20 Page,
21 PageType,
22 User,
23)
24from couchers.servicers.discussions import discussion_to_pb
25from couchers.servicers.events import event_to_pb
26from couchers.servicers.groups import group_to_pb
27from couchers.servicers.pages import page_to_pb
28from couchers.sql import couchers_select as select
29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
30from proto import communities_pb2, communities_pb2_grpc, groups_pb2
32logger = logging.getLogger(__name__)
34MAX_PAGINATION_LENGTH = 25
37def _parents_to_pb(session, node_id):
38 parents = get_node_parents_recursively(session, node_id)
39 return [
40 groups_pb2.Parent(
41 community=groups_pb2.CommunityParent(
42 community_id=node_id,
43 name=cluster.name,
44 slug=cluster.slug,
45 description=cluster.description,
46 )
47 )
48 for node_id, parent_node_id, level, cluster in parents
49 ]
52def communities_to_pb(session, nodes: list[Node], context):
53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes]
55 official_clusters = [node.official_cluster for node in nodes]
56 official_cluster_ids = [cluster.id for cluster in official_clusters]
58 member_counts = dict(
59 session.execute(
60 select(cluster_subscription_counts.c.cluster_id, cluster_subscription_counts.c.count).where(
61 cluster_subscription_counts.c.cluster_id.in_(official_cluster_ids)
62 )
63 ).all()
64 )
65 cluster_memberships = set(
66 session.execute(
67 select(ClusterSubscription.cluster_id)
68 .where(ClusterSubscription.user_id == context.user_id)
69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
70 )
71 .scalars()
72 .all()
73 )
75 admin_counts = dict(
76 session.execute(
77 select(cluster_admin_counts.c.cluster_id, cluster_admin_counts.c.count).where(
78 cluster_admin_counts.c.cluster_id.in_(official_cluster_ids)
79 )
80 ).all()
81 )
82 cluster_adminships = set(
83 session.execute(
84 select(ClusterSubscription.cluster_id)
85 .where(ClusterSubscription.user_id == context.user_id)
86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids))
87 .where(ClusterSubscription.role == ClusterRole.admin)
88 )
89 .scalars()
90 .all()
91 )
93 return [
94 communities_pb2.Community(
95 community_id=node.id,
96 name=official_cluster.name,
97 slug=official_cluster.slug,
98 description=official_cluster.description,
99 created=Timestamp_from_datetime(node.created),
100 parents=_parents_to_pb(session, node.id),
101 member=official_cluster.id in cluster_memberships,
102 admin=official_cluster.id in cluster_adminships,
103 member_count=member_counts.get(official_cluster.id, 1),
104 admin_count=admin_counts.get(official_cluster.id, 1),
105 main_page=page_to_pb(session, official_cluster.main_page, context),
106 can_moderate=can_moderate,
107 )
108 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates)
109 ]
112def community_to_pb(session, node: Node, context):
113 return communities_to_pb(session, [node], context)[0]
116class Communities(communities_pb2_grpc.CommunitiesServicer):
117 def GetCommunity(self, request, context, session):
118 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
119 if not node:
120 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
122 return community_to_pb(session, node, context)
124 def ListCommunities(self, request, context, session):
125 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
126 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0
127 nodes = (
128 session.execute(
129 select(Node)
130 .join(Cluster, Cluster.parent_node_id == Node.id)
131 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
132 .where(Cluster.is_official_cluster)
133 .order_by(Cluster.name)
134 .limit(page_size + 1)
135 .offset(offset)
136 )
137 .scalars()
138 .all()
139 )
140 return communities_pb2.ListCommunitiesRes(
141 communities=communities_to_pb(session, nodes[:page_size], context),
142 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None,
143 )
145 def ListGroups(self, request, context, session):
146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
147 next_cluster_id = int(request.page_token) if request.page_token else 0
148 clusters = (
149 session.execute(
150 select(Cluster)
151 .where(~Cluster.is_official_cluster) # not an official group
152 .where(Cluster.parent_node_id == request.community_id)
153 .where(Cluster.id >= next_cluster_id)
154 .order_by(Cluster.id)
155 .limit(page_size + 1)
156 )
157 .scalars()
158 .all()
159 )
160 return communities_pb2.ListGroupsRes(
161 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
162 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
163 )
165 def ListAdmins(self, request, context, session):
166 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
167 next_admin_id = int(request.page_token) if request.page_token else 0
168 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
169 if not node:
170 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
171 admins = (
172 session.execute(
173 select(User)
174 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
175 .where_users_visible(context)
176 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
177 .where(ClusterSubscription.role == ClusterRole.admin)
178 .where(User.id >= next_admin_id)
179 .order_by(User.id)
180 .limit(page_size + 1)
181 )
182 .scalars()
183 .all()
184 )
185 return communities_pb2.ListAdminsRes(
186 admin_user_ids=[admin.id for admin in admins[:page_size]],
187 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
188 )
190 def AddAdmin(self, request, context, session):
191 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
192 if not node:
193 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
194 if not can_moderate_node(session, context.user_id, node.id):
195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
197 user = session.execute(
198 select(User).where_users_visible(context).where(User.id == request.user_id)
199 ).scalar_one_or_none()
200 if not user:
201 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
203 subscription = session.execute(
204 select(ClusterSubscription)
205 .where(ClusterSubscription.user_id == user.id)
206 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
207 ).scalar_one_or_none()
208 if not subscription:
209 # Can't upgrade a member to admin if they're not already a member
210 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
211 if subscription.role == ClusterRole.admin:
212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN)
214 subscription.role = ClusterRole.admin
216 return empty_pb2.Empty()
218 def RemoveAdmin(self, request, context, session):
219 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
220 if not node:
221 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
222 if not can_moderate_node(session, context.user_id, node.id):
223 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
225 user = session.execute(
226 select(User).where_users_visible(context).where(User.id == request.user_id)
227 ).scalar_one_or_none()
228 if not user:
229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
231 subscription = session.execute(
232 select(ClusterSubscription)
233 .where(ClusterSubscription.user_id == user.id)
234 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
235 ).scalar_one_or_none()
236 if not subscription:
237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
238 if subscription.role == ClusterRole.member:
239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
241 subscription.role = ClusterRole.member
243 return empty_pb2.Empty()
245 def ListMembers(self, request, context, session):
246 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
247 next_member_id = int(request.page_token) if request.page_token else 0
248 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
249 if not node:
250 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
251 members = (
252 session.execute(
253 select(User)
254 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
255 .where_users_visible(context)
256 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
257 .where(User.id >= next_member_id)
258 .order_by(User.id)
259 .limit(page_size + 1)
260 )
261 .scalars()
262 .all()
263 )
264 return communities_pb2.ListMembersRes(
265 member_user_ids=[member.id for member in members[:page_size]],
266 next_page_token=str(members[-1].id) if len(members) > page_size else None,
267 )
269 def ListNearbyUsers(self, request, context, session):
270 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
271 next_nearby_id = int(request.page_token) if request.page_token else 0
272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
273 if not node:
274 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
275 nearbys = (
276 session.execute(
277 select(User)
278 .where_users_visible(context)
279 .where(func.ST_Contains(node.geom, User.geom))
280 .where(User.id >= next_nearby_id)
281 .order_by(User.id)
282 .limit(page_size + 1)
283 )
284 .scalars()
285 .all()
286 )
287 return communities_pb2.ListNearbyUsersRes(
288 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
289 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
290 )
292 def ListPlaces(self, request, context, session):
293 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
294 next_page_id = int(request.page_token) if request.page_token else 0
295 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
296 if not node:
297 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
298 places = (
299 node.official_cluster.owned_pages.where(Page.type == PageType.place)
300 .where(Page.id >= next_page_id)
301 .order_by(Page.id)
302 .limit(page_size + 1)
303 .all()
304 )
305 return communities_pb2.ListPlacesRes(
306 places=[page_to_pb(session, page, context) for page in places[:page_size]],
307 next_page_token=str(places[-1].id) if len(places) > page_size else None,
308 )
310 def ListGuides(self, request, context, session):
311 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
312 next_page_id = int(request.page_token) if request.page_token else 0
313 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
314 if not node:
315 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
316 guides = (
317 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
318 .where(Page.id >= next_page_id)
319 .order_by(Page.id)
320 .limit(page_size + 1)
321 .all()
322 )
323 return communities_pb2.ListGuidesRes(
324 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
325 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
326 )
328 def ListEvents(self, request, context, session):
329 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
330 # the page token is a unix timestamp of where we left off
331 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
333 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
334 if not node:
335 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
337 # for communities, we list events owned by this community or for which this is a parent
338 occurrences = (
339 select(EventOccurrence)
340 .join(Event, Event.id == EventOccurrence.event_id)
341 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node))
342 )
344 if request.past:
345 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
346 EventOccurrence.start_time.desc()
347 )
348 else:
349 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
350 EventOccurrence.start_time.asc()
351 )
353 occurrences = occurrences.limit(page_size + 1)
354 occurrences = session.execute(occurrences).scalars().all()
356 return communities_pb2.ListEventsRes(
357 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
358 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
359 )
361 def ListDiscussions(self, request, context, session):
362 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
363 next_page_id = int(request.page_token) if request.page_token else 0
364 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
365 if not node:
366 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
367 discussions = (
368 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
369 .order_by(Discussion.id.desc())
370 .limit(page_size + 1)
371 .all()
372 )
373 return communities_pb2.ListDiscussionsRes(
374 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
375 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
376 )
378 def JoinCommunity(self, request, context, session):
379 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
380 if not node:
381 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
383 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
384 if current_membership:
385 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
387 node.official_cluster.cluster_subscriptions.append(
388 ClusterSubscription(
389 user_id=context.user_id,
390 role=ClusterRole.member,
391 )
392 )
394 return empty_pb2.Empty()
396 def LeaveCommunity(self, request, context, session):
397 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
398 if not node:
399 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
401 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
403 if not current_membership:
404 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
406 if context.user_id in node.contained_user_ids:
407 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
409 session.execute(
410 delete(ClusterSubscription)
411 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
412 .where(ClusterSubscription.user_id == context.user_id)
413 )
415 return empty_pb2.Empty()
417 def ListUserCommunities(self, request, context, session):
418 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
419 next_node_id = int(request.page_token) if request.page_token else 0
420 user_id = request.user_id or context.user_id
421 nodes = (
422 session.execute(
423 select(Node)
424 .join(Cluster, Cluster.parent_node_id == Node.id)
425 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
426 .where(ClusterSubscription.user_id == user_id)
427 .where(Cluster.is_official_cluster)
428 .where(Node.id >= next_node_id)
429 .order_by(Node.id)
430 .limit(page_size + 1)
431 )
432 .scalars()
433 .all()
434 )
436 return communities_pb2.ListUserCommunitiesRes(
437 communities=communities_to_pb(session, nodes[:page_size], context),
438 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
439 )