Coverage for src/couchers/servicers/communities.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.db import can_moderate_node, get_node_parents_recursively, session_scope
10from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts
11from couchers.models import (
12 Cluster,
13 ClusterRole,
14 ClusterSubscription,
15 Discussion,
16 Event,
17 EventOccurrence,
18 Node,
19 Page,
20 PageType,
21 User,
22)
23from couchers.servicers.discussions import discussion_to_pb
24from couchers.servicers.events import event_to_pb
25from couchers.servicers.groups import group_to_pb
26from couchers.servicers.pages import page_to_pb
27from couchers.sql import couchers_select as select
28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
29from proto import communities_pb2, communities_pb2_grpc, groups_pb2
31logger = logging.getLogger(__name__)
33MAX_PAGINATION_LENGTH = 25
36def _parents_to_pb(node_id):
37 with session_scope() as session:
38 parents = get_node_parents_recursively(session, node_id)
39 return [
40 groups_pb2.Parent(
41 community=groups_pb2.CommunityParent(
42 community_id=node_id,
43 name=cluster.name,
44 slug=cluster.slug,
45 description=cluster.description,
46 )
47 )
48 for node_id, parent_node_id, level, cluster in parents
49 ]
52def community_to_pb(node: Node, context):
53 with session_scope() as session:
54 can_moderate = can_moderate_node(session, context.user_id, node.id)
56 member_count = (
57 session.execute(
58 select(cluster_subscription_counts.c.count).where(
59 cluster_subscription_counts.c.cluster_id == node.official_cluster.id
60 )
61 ).scalar_one_or_none()
62 or 1
63 )
64 is_member = (
65 session.execute(
66 select(ClusterSubscription)
67 .where(ClusterSubscription.user_id == context.user_id)
68 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
69 ).scalar_one_or_none()
70 is not None
71 )
73 admin_count = (
74 session.execute(
75 select(cluster_admin_counts.c.count).where(
76 cluster_admin_counts.c.cluster_id == node.official_cluster.id
77 )
78 ).scalar_one_or_none()
79 or 1
80 )
81 is_admin = (
82 session.execute(
83 select(ClusterSubscription)
84 .where(ClusterSubscription.user_id == context.user_id)
85 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
86 .where(ClusterSubscription.role == ClusterRole.admin)
87 ).scalar_one_or_none()
88 is not None
89 )
91 return communities_pb2.Community(
92 community_id=node.id,
93 name=node.official_cluster.name,
94 slug=node.official_cluster.slug,
95 description=node.official_cluster.description,
96 created=Timestamp_from_datetime(node.created),
97 parents=_parents_to_pb(node.id),
98 member=is_member,
99 admin=is_admin,
100 member_count=member_count,
101 admin_count=admin_count,
102 main_page=page_to_pb(node.official_cluster.main_page, context),
103 can_moderate=can_moderate,
104 )
107class Communities(communities_pb2_grpc.CommunitiesServicer):
108 def GetCommunity(self, request, context):
109 with session_scope() as session:
110 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
111 if not node:
112 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
114 return community_to_pb(node, context)
116 def ListCommunities(self, request, context):
117 with session_scope() as session:
118 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
119 next_node_id = int(request.page_token) if request.page_token else 0
120 nodes = (
121 session.execute(
122 select(Node)
123 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
124 .where(Node.id >= next_node_id)
125 .order_by(Node.id)
126 .limit(page_size + 1)
127 )
128 .scalars()
129 .all()
130 )
131 return communities_pb2.ListCommunitiesRes(
132 communities=[community_to_pb(node, context) for node in nodes[:page_size]],
133 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
134 )
136 def ListGroups(self, request, context):
137 with session_scope() as session:
138 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
139 next_cluster_id = int(request.page_token) if request.page_token else 0
140 clusters = (
141 session.execute(
142 select(Cluster)
143 .where(~Cluster.is_official_cluster) # not an official group
144 .where(Cluster.parent_node_id == request.community_id)
145 .where(Cluster.id >= next_cluster_id)
146 .order_by(Cluster.id)
147 .limit(page_size + 1)
148 )
149 .scalars()
150 .all()
151 )
152 return communities_pb2.ListGroupsRes(
153 groups=[group_to_pb(cluster, context) for cluster in clusters[:page_size]],
154 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
155 )
157 def ListAdmins(self, request, context):
158 with session_scope() as session:
159 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
160 next_admin_id = int(request.page_token) if request.page_token else 0
161 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
162 if not node:
163 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
164 admins = (
165 session.execute(
166 select(User)
167 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
168 .where_users_visible(context)
169 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
170 .where(ClusterSubscription.role == ClusterRole.admin)
171 .where(User.id >= next_admin_id)
172 .order_by(User.id)
173 .limit(page_size + 1)
174 )
175 .scalars()
176 .all()
177 )
178 return communities_pb2.ListAdminsRes(
179 admin_user_ids=[admin.id for admin in admins[:page_size]],
180 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
181 )
183 def ListMembers(self, request, context):
184 with session_scope() as session:
185 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
186 next_member_id = int(request.page_token) if request.page_token else 0
187 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
188 if not node:
189 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
190 members = (
191 session.execute(
192 select(User)
193 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
194 .where_users_visible(context)
195 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
196 .where(User.id >= next_member_id)
197 .order_by(User.id)
198 .limit(page_size + 1)
199 )
200 .scalars()
201 .all()
202 )
203 return communities_pb2.ListMembersRes(
204 member_user_ids=[member.id for member in members[:page_size]],
205 next_page_token=str(members[-1].id) if len(members) > page_size else None,
206 )
208 def ListNearbyUsers(self, request, context):
209 with session_scope() as session:
210 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
211 next_nearby_id = int(request.page_token) if request.page_token else 0
212 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
213 if not node:
214 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
215 nearbys = (
216 session.execute(
217 select(User)
218 .where_users_visible(context)
219 .where(func.ST_Contains(node.geom, User.geom))
220 .where(User.id >= next_nearby_id)
221 .order_by(User.id)
222 .limit(page_size + 1)
223 )
224 .scalars()
225 .all()
226 )
227 return communities_pb2.ListNearbyUsersRes(
228 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
229 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
230 )
232 def ListPlaces(self, request, context):
233 with session_scope() as session:
234 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
235 next_page_id = int(request.page_token) if request.page_token else 0
236 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
237 if not node:
238 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
239 places = (
240 node.official_cluster.owned_pages.where(Page.type == PageType.place)
241 .where(Page.id >= next_page_id)
242 .order_by(Page.id)
243 .limit(page_size + 1)
244 .all()
245 )
246 return communities_pb2.ListPlacesRes(
247 places=[page_to_pb(page, context) for page in places[:page_size]],
248 next_page_token=str(places[-1].id) if len(places) > page_size else None,
249 )
251 def ListGuides(self, request, context):
252 with session_scope() as session:
253 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
254 next_page_id = int(request.page_token) if request.page_token else 0
255 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
256 if not node:
257 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
258 guides = (
259 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
260 .where(Page.id >= next_page_id)
261 .order_by(Page.id)
262 .limit(page_size + 1)
263 .all()
264 )
265 return communities_pb2.ListGuidesRes(
266 guides=[page_to_pb(page, context) for page in guides[:page_size]],
267 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
268 )
270 def ListEvents(self, request, context):
271 with session_scope() as session:
272 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
273 # the page token is a unix timestamp of where we left off
274 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
276 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
277 if not node:
278 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
280 # for communities, we list events owned by this community or for which this is a parent
281 occurrences = (
282 select(EventOccurrence)
283 .join(Event, Event.id == EventOccurrence.event_id)
284 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node))
285 )
287 if request.past:
288 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
289 EventOccurrence.start_time.desc()
290 )
291 else:
292 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
293 EventOccurrence.start_time.asc()
294 )
296 occurrences = occurrences.limit(page_size + 1)
297 occurrences = session.execute(occurrences).scalars().all()
299 return communities_pb2.ListEventsRes(
300 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
301 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
302 )
304 def ListDiscussions(self, request, context):
305 with session_scope() as session:
306 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
307 next_page_id = int(request.page_token) if request.page_token else 0
308 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
309 if not node:
310 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
311 discussions = (
312 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
313 .order_by(Discussion.id.desc())
314 .limit(page_size + 1)
315 .all()
316 )
317 return communities_pb2.ListDiscussionsRes(
318 discussions=[discussion_to_pb(discussion, context) for discussion in discussions[:page_size]],
319 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
320 )
322 def JoinCommunity(self, request, context):
323 with session_scope() as session:
324 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
325 if not node:
326 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
328 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
329 if current_membership:
330 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
332 node.official_cluster.cluster_subscriptions.append(
333 ClusterSubscription(
334 user_id=context.user_id,
335 role=ClusterRole.member,
336 )
337 )
339 return empty_pb2.Empty()
341 def LeaveCommunity(self, request, context):
342 with session_scope() as session:
343 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
344 if not node:
345 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
347 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
349 if not current_membership:
350 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
352 if context.user_id in node.contained_user_ids:
353 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
355 session.execute(
356 delete(ClusterSubscription)
357 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
358 .where(ClusterSubscription.user_id == context.user_id)
359 )
361 return empty_pb2.Empty()
363 def ListUserCommunities(self, request, context):
364 with session_scope() as session:
365 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
366 next_node_id = int(request.page_token) if request.page_token else 0
367 user_id = request.user_id or context.user_id
368 nodes = (
369 session.execute(
370 select(Node)
371 .join(Cluster, Cluster.parent_node_id == Node.id)
372 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
373 .where(ClusterSubscription.user_id == user_id)
374 .where(Cluster.is_official_cluster)
375 .where(Node.id >= next_node_id)
376 .order_by(Node.id)
377 .limit(page_size + 1)
378 )
379 .scalars()
380 .all()
381 )
383 return communities_pb2.ListUserCommunitiesRes(
384 communities=[community_to_pb(node, context) for node in nodes[:page_size]],
385 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
386 )