Coverage for src/couchers/servicers/communities.py: 83%
163 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.db import can_moderate_node, get_node_parents_recursively
10from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts
11from couchers.models import (
12 Cluster,
13 ClusterRole,
14 ClusterSubscription,
15 Discussion,
16 Event,
17 EventOccurrence,
18 Node,
19 Page,
20 PageType,
21 User,
22)
23from couchers.servicers.discussions import discussion_to_pb
24from couchers.servicers.events import event_to_pb
25from couchers.servicers.groups import group_to_pb
26from couchers.servicers.pages import page_to_pb
27from couchers.sql import couchers_select as select
28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
29from proto import communities_pb2, communities_pb2_grpc, groups_pb2
31logger = logging.getLogger(__name__)
33MAX_PAGINATION_LENGTH = 25
36def _parents_to_pb(session, node_id):
37 parents = get_node_parents_recursively(session, node_id)
38 return [
39 groups_pb2.Parent(
40 community=groups_pb2.CommunityParent(
41 community_id=node_id,
42 name=cluster.name,
43 slug=cluster.slug,
44 description=cluster.description,
45 )
46 )
47 for node_id, parent_node_id, level, cluster in parents
48 ]
51def community_to_pb(session, node: Node, context):
52 can_moderate = can_moderate_node(session, context.user_id, node.id)
54 member_count = (
55 session.execute(
56 select(cluster_subscription_counts.c.count).where(
57 cluster_subscription_counts.c.cluster_id == node.official_cluster.id
58 )
59 ).scalar_one_or_none()
60 or 1
61 )
62 is_member = (
63 session.execute(
64 select(ClusterSubscription)
65 .where(ClusterSubscription.user_id == context.user_id)
66 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
67 ).scalar_one_or_none()
68 is not None
69 )
71 admin_count = (
72 session.execute(
73 select(cluster_admin_counts.c.count).where(cluster_admin_counts.c.cluster_id == node.official_cluster.id)
74 ).scalar_one_or_none()
75 or 1
76 )
77 is_admin = (
78 session.execute(
79 select(ClusterSubscription)
80 .where(ClusterSubscription.user_id == context.user_id)
81 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
82 .where(ClusterSubscription.role == ClusterRole.admin)
83 ).scalar_one_or_none()
84 is not None
85 )
87 return communities_pb2.Community(
88 community_id=node.id,
89 name=node.official_cluster.name,
90 slug=node.official_cluster.slug,
91 description=node.official_cluster.description,
92 created=Timestamp_from_datetime(node.created),
93 parents=_parents_to_pb(session, node.id),
94 member=is_member,
95 admin=is_admin,
96 member_count=member_count,
97 admin_count=admin_count,
98 main_page=page_to_pb(session, node.official_cluster.main_page, context),
99 can_moderate=can_moderate,
100 )
103class Communities(communities_pb2_grpc.CommunitiesServicer):
104 def GetCommunity(self, request, context, session):
105 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
106 if not node:
107 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
109 return community_to_pb(session, node, context)
111 def ListCommunities(self, request, context, session):
112 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
113 next_node_id = int(request.page_token) if request.page_token else 0
114 nodes = (
115 session.execute(
116 select(Node)
117 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
118 .where(Node.id >= next_node_id)
119 .order_by(Node.id)
120 .limit(page_size + 1)
121 )
122 .scalars()
123 .all()
124 )
125 return communities_pb2.ListCommunitiesRes(
126 communities=[community_to_pb(session, node, context) for node in nodes[:page_size]],
127 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
128 )
130 def ListGroups(self, request, context, session):
131 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
132 next_cluster_id = int(request.page_token) if request.page_token else 0
133 clusters = (
134 session.execute(
135 select(Cluster)
136 .where(~Cluster.is_official_cluster) # not an official group
137 .where(Cluster.parent_node_id == request.community_id)
138 .where(Cluster.id >= next_cluster_id)
139 .order_by(Cluster.id)
140 .limit(page_size + 1)
141 )
142 .scalars()
143 .all()
144 )
145 return communities_pb2.ListGroupsRes(
146 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
147 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
148 )
150 def ListAdmins(self, request, context, session):
151 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
152 next_admin_id = int(request.page_token) if request.page_token else 0
153 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
154 if not node:
155 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
156 admins = (
157 session.execute(
158 select(User)
159 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
160 .where_users_visible(context)
161 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
162 .where(ClusterSubscription.role == ClusterRole.admin)
163 .where(User.id >= next_admin_id)
164 .order_by(User.id)
165 .limit(page_size + 1)
166 )
167 .scalars()
168 .all()
169 )
170 return communities_pb2.ListAdminsRes(
171 admin_user_ids=[admin.id for admin in admins[:page_size]],
172 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
173 )
175 def AddAdmin(self, request, context, session):
176 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
177 if not node:
178 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
179 if not can_moderate_node(session, context.user_id, node.id):
180 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
182 user = session.execute(
183 select(User).where_users_visible(context).where(User.id == request.user_id)
184 ).scalar_one_or_none()
185 if not user:
186 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
188 subscription = session.execute(
189 select(ClusterSubscription)
190 .where(ClusterSubscription.user_id == user.id)
191 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
192 ).scalar_one_or_none()
193 if not subscription:
194 # Can't upgrade a member to admin if they're not already a member
195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
196 if subscription.role == ClusterRole.admin:
197 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN)
199 subscription.role = ClusterRole.admin
201 return empty_pb2.Empty()
203 def RemoveAdmin(self, request, context, session):
204 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
205 if not node:
206 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
207 if not can_moderate_node(session, context.user_id, node.id):
208 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED)
210 user = session.execute(
211 select(User).where_users_visible(context).where(User.id == request.user_id)
212 ).scalar_one_or_none()
213 if not user:
214 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
216 subscription = session.execute(
217 select(ClusterSubscription)
218 .where(ClusterSubscription.user_id == user.id)
219 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
220 ).scalar_one_or_none()
221 if not subscription:
222 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER)
223 if subscription.role == ClusterRole.member:
224 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN)
226 subscription.role = ClusterRole.member
228 return empty_pb2.Empty()
230 def ListMembers(self, request, context, session):
231 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
232 next_member_id = int(request.page_token) if request.page_token else 0
233 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
234 if not node:
235 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
236 members = (
237 session.execute(
238 select(User)
239 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
240 .where_users_visible(context)
241 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
242 .where(User.id >= next_member_id)
243 .order_by(User.id)
244 .limit(page_size + 1)
245 )
246 .scalars()
247 .all()
248 )
249 return communities_pb2.ListMembersRes(
250 member_user_ids=[member.id for member in members[:page_size]],
251 next_page_token=str(members[-1].id) if len(members) > page_size else None,
252 )
254 def ListNearbyUsers(self, request, context, session):
255 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
256 next_nearby_id = int(request.page_token) if request.page_token else 0
257 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
258 if not node:
259 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
260 nearbys = (
261 session.execute(
262 select(User)
263 .where_users_visible(context)
264 .where(func.ST_Contains(node.geom, User.geom))
265 .where(User.id >= next_nearby_id)
266 .order_by(User.id)
267 .limit(page_size + 1)
268 )
269 .scalars()
270 .all()
271 )
272 return communities_pb2.ListNearbyUsersRes(
273 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
274 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
275 )
277 def ListPlaces(self, request, context, session):
278 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
279 next_page_id = int(request.page_token) if request.page_token else 0
280 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
281 if not node:
282 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
283 places = (
284 node.official_cluster.owned_pages.where(Page.type == PageType.place)
285 .where(Page.id >= next_page_id)
286 .order_by(Page.id)
287 .limit(page_size + 1)
288 .all()
289 )
290 return communities_pb2.ListPlacesRes(
291 places=[page_to_pb(session, page, context) for page in places[:page_size]],
292 next_page_token=str(places[-1].id) if len(places) > page_size else None,
293 )
295 def ListGuides(self, request, context, session):
296 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
297 next_page_id = int(request.page_token) if request.page_token else 0
298 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
299 if not node:
300 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
301 guides = (
302 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
303 .where(Page.id >= next_page_id)
304 .order_by(Page.id)
305 .limit(page_size + 1)
306 .all()
307 )
308 return communities_pb2.ListGuidesRes(
309 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
310 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
311 )
313 def ListEvents(self, request, context, session):
314 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
315 # the page token is a unix timestamp of where we left off
316 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
318 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
319 if not node:
320 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
322 # for communities, we list events owned by this community or for which this is a parent
323 occurrences = (
324 select(EventOccurrence)
325 .join(Event, Event.id == EventOccurrence.event_id)
326 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node))
327 )
329 if request.past:
330 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
331 EventOccurrence.start_time.desc()
332 )
333 else:
334 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
335 EventOccurrence.start_time.asc()
336 )
338 occurrences = occurrences.limit(page_size + 1)
339 occurrences = session.execute(occurrences).scalars().all()
341 return communities_pb2.ListEventsRes(
342 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
343 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
344 )
346 def ListDiscussions(self, request, context, session):
347 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
348 next_page_id = int(request.page_token) if request.page_token else 0
349 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
350 if not node:
351 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
352 discussions = (
353 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
354 .order_by(Discussion.id.desc())
355 .limit(page_size + 1)
356 .all()
357 )
358 return communities_pb2.ListDiscussionsRes(
359 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
360 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
361 )
363 def JoinCommunity(self, request, context, session):
364 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
365 if not node:
366 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
368 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
369 if current_membership:
370 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
372 node.official_cluster.cluster_subscriptions.append(
373 ClusterSubscription(
374 user_id=context.user_id,
375 role=ClusterRole.member,
376 )
377 )
379 return empty_pb2.Empty()
381 def LeaveCommunity(self, request, context, session):
382 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
383 if not node:
384 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
386 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
388 if not current_membership:
389 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
391 if context.user_id in node.contained_user_ids:
392 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
394 session.execute(
395 delete(ClusterSubscription)
396 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
397 .where(ClusterSubscription.user_id == context.user_id)
398 )
400 return empty_pb2.Empty()
402 def ListUserCommunities(self, request, context, session):
403 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
404 next_node_id = int(request.page_token) if request.page_token else 0
405 user_id = request.user_id or context.user_id
406 nodes = (
407 session.execute(
408 select(Node)
409 .join(Cluster, Cluster.parent_node_id == Node.id)
410 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
411 .where(ClusterSubscription.user_id == user_id)
412 .where(Cluster.is_official_cluster)
413 .where(Node.id >= next_node_id)
414 .order_by(Node.id)
415 .limit(page_size + 1)
416 )
417 .scalars()
418 .all()
419 )
421 return communities_pb2.ListUserCommunitiesRes(
422 communities=[community_to_pb(session, node, context) for node in nodes[:page_size]],
423 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
424 )