Coverage for src/couchers/servicers/communities.py: 83%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func, or_
8from couchers import errors
9from couchers.db import can_moderate_node, get_node_parents_recursively, session_scope
10from couchers.models import (
11 Cluster,
12 ClusterRole,
13 ClusterSubscription,
14 Discussion,
15 Event,
16 EventOccurrence,
17 Node,
18 Page,
19 PageType,
20 User,
21)
22from couchers.servicers.discussions import discussion_to_pb
23from couchers.servicers.events import event_to_pb
24from couchers.servicers.groups import group_to_pb
25from couchers.servicers.pages import page_to_pb
26from couchers.sql import couchers_select as select
27from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
28from proto import communities_pb2, communities_pb2_grpc, groups_pb2
30logger = logging.getLogger(__name__)
32MAX_PAGINATION_LENGTH = 25
35def _parents_to_pb(node_id):
36 with session_scope() as session:
37 parents = get_node_parents_recursively(session, node_id)
38 return [
39 groups_pb2.Parent(
40 community=groups_pb2.CommunityParent(
41 community_id=node_id,
42 name=cluster.name,
43 slug=cluster.slug,
44 description=cluster.description,
45 )
46 )
47 for node_id, parent_node_id, level, cluster in parents
48 ]
51def community_to_pb(node: Node, context):
52 with session_scope() as session:
53 can_moderate = can_moderate_node(session, context.user_id, node.id)
55 member_count = session.execute(
56 select(func.count())
57 .select_from(ClusterSubscription)
58 .where_users_column_visible(context, ClusterSubscription.user_id)
59 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
60 ).scalar_one()
61 is_member = (
62 session.execute(
63 select(ClusterSubscription)
64 .where(ClusterSubscription.user_id == context.user_id)
65 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
66 ).scalar_one_or_none()
67 is not None
68 )
70 admin_count = session.execute(
71 select(func.count())
72 .select_from(ClusterSubscription)
73 .where_users_column_visible(context, ClusterSubscription.user_id)
74 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
75 .where(ClusterSubscription.role == ClusterRole.admin)
76 ).scalar_one()
77 is_admin = (
78 session.execute(
79 select(ClusterSubscription)
80 .where(ClusterSubscription.user_id == context.user_id)
81 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
82 .where(ClusterSubscription.role == ClusterRole.admin)
83 ).scalar_one_or_none()
84 is not None
85 )
87 return communities_pb2.Community(
88 community_id=node.id,
89 name=node.official_cluster.name,
90 slug=node.official_cluster.slug,
91 description=node.official_cluster.description,
92 created=Timestamp_from_datetime(node.created),
93 parents=_parents_to_pb(node.id),
94 member=is_member,
95 admin=is_admin,
96 member_count=member_count,
97 admin_count=admin_count,
98 main_page=page_to_pb(node.official_cluster.main_page, context),
99 can_moderate=can_moderate,
100 )
103class Communities(communities_pb2_grpc.CommunitiesServicer):
104 def GetCommunity(self, request, context):
105 with session_scope() as session:
106 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
107 if not node:
108 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
110 return community_to_pb(node, context)
112 def ListCommunities(self, request, context):
113 with session_scope() as session:
114 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
115 next_node_id = int(request.page_token) if request.page_token else 0
116 nodes = (
117 session.execute(
118 select(Node)
119 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0))
120 .where(Node.id >= next_node_id)
121 .order_by(Node.id)
122 .limit(page_size + 1)
123 )
124 .scalars()
125 .all()
126 )
127 return communities_pb2.ListCommunitiesRes(
128 communities=[community_to_pb(node, context) for node in nodes[:page_size]],
129 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
130 )
132 def ListGroups(self, request, context):
133 with session_scope() as session:
134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
135 next_cluster_id = int(request.page_token) if request.page_token else 0
136 clusters = (
137 session.execute(
138 select(Cluster)
139 .where(~Cluster.is_official_cluster) # not an official group
140 .where(Cluster.parent_node_id == request.community_id)
141 .where(Cluster.id >= next_cluster_id)
142 .order_by(Cluster.id)
143 .limit(page_size + 1)
144 )
145 .scalars()
146 .all()
147 )
148 return communities_pb2.ListGroupsRes(
149 groups=[group_to_pb(cluster, context) for cluster in clusters[:page_size]],
150 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
151 )
153 def ListAdmins(self, request, context):
154 with session_scope() as session:
155 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
156 next_admin_id = int(request.page_token) if request.page_token else 0
157 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
158 if not node:
159 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
160 admins = (
161 session.execute(
162 select(User)
163 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
164 .where_users_visible(context)
165 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
166 .where(ClusterSubscription.role == ClusterRole.admin)
167 .where(User.id >= next_admin_id)
168 .order_by(User.id)
169 .limit(page_size + 1)
170 )
171 .scalars()
172 .all()
173 )
174 return communities_pb2.ListAdminsRes(
175 admin_user_ids=[admin.id for admin in admins[:page_size]],
176 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
177 )
179 def ListMembers(self, request, context):
180 with session_scope() as session:
181 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
182 next_member_id = int(request.page_token) if request.page_token else 0
183 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
184 if not node:
185 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
186 members = (
187 session.execute(
188 select(User)
189 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
190 .where_users_visible(context)
191 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
192 .where(User.id >= next_member_id)
193 .order_by(User.id)
194 .limit(page_size + 1)
195 )
196 .scalars()
197 .all()
198 )
199 return communities_pb2.ListMembersRes(
200 member_user_ids=[member.id for member in members[:page_size]],
201 next_page_token=str(members[-1].id) if len(members) > page_size else None,
202 )
204 def ListNearbyUsers(self, request, context):
205 with session_scope() as session:
206 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
207 next_nearby_id = int(request.page_token) if request.page_token else 0
208 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
209 if not node:
210 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
211 nearbys = (
212 session.execute(
213 select(User)
214 .where_users_visible(context)
215 .where(func.ST_Contains(node.geom, User.geom))
216 .where(User.id >= next_nearby_id)
217 .order_by(User.id)
218 .limit(page_size + 1)
219 )
220 .scalars()
221 .all()
222 )
223 return communities_pb2.ListNearbyUsersRes(
224 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]],
225 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None,
226 )
228 def ListPlaces(self, request, context):
229 with session_scope() as session:
230 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
231 next_page_id = int(request.page_token) if request.page_token else 0
232 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
233 if not node:
234 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
235 places = (
236 node.official_cluster.owned_pages.where(Page.type == PageType.place)
237 .where(Page.id >= next_page_id)
238 .order_by(Page.id)
239 .limit(page_size + 1)
240 .all()
241 )
242 return communities_pb2.ListPlacesRes(
243 places=[page_to_pb(page, context) for page in places[:page_size]],
244 next_page_token=str(places[-1].id) if len(places) > page_size else None,
245 )
247 def ListGuides(self, request, context):
248 with session_scope() as session:
249 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
250 next_page_id = int(request.page_token) if request.page_token else 0
251 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
252 if not node:
253 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
254 guides = (
255 node.official_cluster.owned_pages.where(Page.type == PageType.guide)
256 .where(Page.id >= next_page_id)
257 .order_by(Page.id)
258 .limit(page_size + 1)
259 .all()
260 )
261 return communities_pb2.ListGuidesRes(
262 guides=[page_to_pb(page, context) for page in guides[:page_size]],
263 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
264 )
266 def ListEvents(self, request, context):
267 with session_scope() as session:
268 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
269 # the page token is a unix timestamp of where we left off
270 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
273 if not node:
274 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
276 # for communities, we list events owned by this community or for which this is a parent
277 occurrences = (
278 select(EventOccurrence)
279 .join(Event, Event.id == EventOccurrence.event_id)
280 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node))
281 )
283 if request.past:
284 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
285 EventOccurrence.start_time.desc()
286 )
287 else:
288 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
289 EventOccurrence.start_time.asc()
290 )
292 occurrences = occurrences.limit(page_size + 1)
293 occurrences = session.execute(occurrences).scalars().all()
295 return communities_pb2.ListEventsRes(
296 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
297 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
298 )
300 def ListDiscussions(self, request, context):
301 with session_scope() as session:
302 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
303 next_page_id = int(request.page_token) if request.page_token else 0
304 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
305 if not node:
306 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
307 discussions = (
308 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0))
309 .order_by(Discussion.id.desc())
310 .limit(page_size + 1)
311 .all()
312 )
313 return communities_pb2.ListDiscussionsRes(
314 discussions=[discussion_to_pb(discussion, context) for discussion in discussions[:page_size]],
315 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
316 )
318 def JoinCommunity(self, request, context):
319 with session_scope() as session:
320 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
321 if not node:
322 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
324 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
325 if current_membership:
326 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY)
328 node.official_cluster.cluster_subscriptions.append(
329 ClusterSubscription(
330 user_id=context.user_id,
331 role=ClusterRole.member,
332 )
333 )
335 return empty_pb2.Empty()
337 def LeaveCommunity(self, request, context):
338 with session_scope() as session:
339 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
340 if not node:
341 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND)
343 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none()
345 if not current_membership:
346 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY)
348 if context.user_id in node.contained_user_ids:
349 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY)
351 session.execute(
352 delete(ClusterSubscription)
353 .where(ClusterSubscription.cluster_id == node.official_cluster.id)
354 .where(ClusterSubscription.user_id == context.user_id)
355 )
357 return empty_pb2.Empty()
359 def ListUserCommunities(self, request, context):
360 with session_scope() as session:
361 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
362 next_node_id = int(request.page_token) if request.page_token else 0
363 user_id = request.user_id or context.user_id
364 nodes = (
365 session.execute(
366 select(Node)
367 .join(Cluster, Cluster.parent_node_id == Node.id)
368 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
369 .where(ClusterSubscription.user_id == user_id)
370 .where(Cluster.is_official_cluster)
371 .where(Node.id >= next_node_id)
372 .order_by(Node.id)
373 .limit(page_size + 1)
374 )
375 .scalars()
376 .all()
377 )
379 return communities_pb2.ListUserCommunitiesRes(
380 communities=[community_to_pb(node, context) for node in nodes[:page_size]],
381 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None,
382 )