Coverage for app / backend / src / couchers / servicers / groups.py: 75%
117 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import select
7from sqlalchemy.orm import Session
8from sqlalchemy.sql import delete, func
10from couchers.context import CouchersContext
11from couchers.db import can_moderate_node, get_node_parents_recursively
12from couchers.event_log import log_event
13from couchers.models import (
14 Cluster,
15 ClusterRole,
16 ClusterSubscription,
17 Discussion,
18 Event,
19 EventOccurrence,
20 Page,
21 PageType,
22 User,
23)
24from couchers.proto import groups_pb2, groups_pb2_grpc
25from couchers.servicers.discussions import discussion_to_pb
26from couchers.servicers.events import event_to_pb
27from couchers.servicers.pages import page_to_pb
28from couchers.sql import users_visible, where_moderated_content_visible, where_users_column_visible
29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
31logger = logging.getLogger(__name__)
33MAX_PAGINATION_LENGTH = 25
36def _parents_to_pb(session: Session, cluster: Cluster) -> list[groups_pb2.Parent]:
37 parents = get_node_parents_recursively(session, cluster.parent_node_id)
38 return [
39 groups_pb2.Parent(
40 community=groups_pb2.CommunityParent(
41 community_id=node_id,
42 name=cluster.name,
43 slug=cluster.slug,
44 description=cluster.description,
45 )
46 )
47 for node_id, parent_node_id, level, cluster in parents
48 ] + [
49 groups_pb2.Parent(
50 group=groups_pb2.GroupParent(
51 group_id=cluster.id,
52 name=cluster.name,
53 slug=cluster.slug,
54 description=cluster.description,
55 )
56 )
57 ]
60def group_to_pb(session: Session, cluster: Cluster, context: CouchersContext) -> groups_pb2.Group:
61 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id)
63 member_count = session.execute(
64 where_users_column_visible(
65 select(func.count()).select_from(ClusterSubscription).where(ClusterSubscription.cluster_id == cluster.id),
66 context,
67 ClusterSubscription.user_id,
68 )
69 ).scalar_one()
70 is_member = (
71 session.execute(
72 select(ClusterSubscription)
73 .where(ClusterSubscription.user_id == context.user_id)
74 .where(ClusterSubscription.cluster_id == cluster.id)
75 ).scalar_one_or_none()
76 is not None
77 )
79 admin_count = session.execute(
80 where_users_column_visible(
81 select(func.count())
82 .select_from(ClusterSubscription)
83 .where(ClusterSubscription.cluster_id == cluster.id)
84 .where(ClusterSubscription.role == ClusterRole.admin),
85 context,
86 ClusterSubscription.user_id,
87 )
88 ).scalar_one()
89 is_admin = (
90 session.execute(
91 select(ClusterSubscription)
92 .where(ClusterSubscription.user_id == context.user_id)
93 .where(ClusterSubscription.cluster_id == cluster.id)
94 .where(ClusterSubscription.role == ClusterRole.admin)
95 ).scalar_one_or_none()
96 is not None
97 )
99 return groups_pb2.Group(
100 group_id=cluster.id,
101 name=cluster.name,
102 slug=cluster.slug,
103 description=cluster.description,
104 created=Timestamp_from_datetime(cluster.created),
105 parents=_parents_to_pb(session, cluster),
106 main_page=page_to_pb(session, cluster.main_page, context),
107 member=is_member,
108 admin=is_admin,
109 member_count=member_count,
110 admin_count=admin_count,
111 can_moderate=can_moderate,
112 )
115class Groups(groups_pb2_grpc.GroupsServicer):
116 def GetGroup(self, request: groups_pb2.GetGroupReq, context: CouchersContext, session: Session) -> groups_pb2.Group:
117 cluster = session.execute(
118 select(Cluster)
119 .where(~Cluster.is_official_cluster) # not an official group
120 .where(Cluster.id == request.group_id)
121 ).scalar_one_or_none()
122 if not cluster: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
125 return group_to_pb(session, cluster, context)
127 def ListAdmins(
128 self, request: groups_pb2.ListAdminsReq, context: CouchersContext, session: Session
129 ) -> groups_pb2.ListAdminsRes:
130 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
131 next_admin_id = int(request.page_token) if request.page_token else 0
132 cluster = session.execute(
133 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
134 ).scalar_one_or_none()
135 if not cluster: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
138 admins = (
139 session.execute(
140 select(User)
141 .where(users_visible(context))
142 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
143 .where(ClusterSubscription.cluster_id == cluster.id)
144 .where(ClusterSubscription.role == ClusterRole.admin)
145 .where(User.id >= next_admin_id)
146 .order_by(User.id)
147 .limit(page_size + 1)
148 )
149 .scalars()
150 .all()
151 )
152 return groups_pb2.ListAdminsRes(
153 admin_user_ids=[admin.id for admin in admins[:page_size]],
154 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
155 )
157 def ListMembers(
158 self, request: groups_pb2.ListMembersReq, context: CouchersContext, session: Session
159 ) -> groups_pb2.ListMembersRes:
160 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
161 next_member_id = int(request.page_token) if request.page_token else 0
162 cluster = session.execute(
163 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
164 ).scalar_one_or_none()
165 if not cluster: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
168 members = (
169 session.execute(
170 select(User)
171 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
172 .where(users_visible(context))
173 .where(ClusterSubscription.cluster_id == cluster.id)
174 .where(User.id >= next_member_id)
175 .order_by(User.id)
176 .limit(page_size + 1)
177 )
178 .scalars()
179 .all()
180 )
181 return groups_pb2.ListMembersRes(
182 member_user_ids=[member.id for member in members[:page_size]],
183 next_page_token=str(members[-1].id) if len(members) > page_size else None,
184 )
186 def ListPlaces(
187 self, request: groups_pb2.ListPlacesReq, context: CouchersContext, session: Session
188 ) -> groups_pb2.ListPlacesRes:
189 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
190 next_page_id = int(request.page_token) if request.page_token else 0
191 cluster = session.execute(
192 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
193 ).scalar_one_or_none()
194 if not cluster:
195 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
196 places = (
197 cluster.owned_pages.where(Page.type == PageType.place)
198 .where(Page.id >= next_page_id)
199 .order_by(Page.id)
200 .limit(page_size + 1)
201 .all()
202 )
203 return groups_pb2.ListPlacesRes(
204 places=[page_to_pb(session, page, context) for page in places[:page_size]],
205 next_page_token=str(places[-1].id) if len(places) > page_size else None,
206 )
208 def ListGuides(
209 self, request: groups_pb2.ListGuidesReq, context: CouchersContext, session: Session
210 ) -> groups_pb2.ListGuidesRes:
211 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
212 next_page_id = int(request.page_token) if request.page_token else 0
213 cluster = session.execute(
214 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
215 ).scalar_one_or_none()
216 if not cluster:
217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
218 guides = (
219 cluster.owned_pages.where(Page.type == PageType.guide)
220 .where(Page.id >= next_page_id)
221 .order_by(Page.id)
222 .limit(page_size + 1)
223 .all()
224 )
225 return groups_pb2.ListGuidesRes(
226 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
227 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
228 )
230 def ListEvents(
231 self, request: groups_pb2.ListEventsReq, context: CouchersContext, session: Session
232 ) -> groups_pb2.ListEventsRes:
233 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
234 # the page token is a unix timestamp of where we left off
235 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
237 cluster = session.execute(
238 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
239 ).scalar_one_or_none()
240 if not cluster: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
243 query = (
244 select(EventOccurrence)
245 .join(Event, Event.id == EventOccurrence.event_id)
246 .where(Event.owner_cluster == cluster)
247 )
248 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True)
250 if not request.past: 250 ↛ 254line 250 didn't jump to line 254 because the condition on line 250 was always true
251 cutoff = page_token - timedelta(seconds=1)
252 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc())
253 else:
254 cutoff = page_token + timedelta(seconds=1)
255 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc())
257 query = query.limit(page_size + 1)
258 occurrences = session.execute(query).scalars().all()
260 return groups_pb2.ListEventsRes(
261 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
262 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
263 )
265 def ListDiscussions(
266 self, request: groups_pb2.ListDiscussionsReq, context: CouchersContext, session: Session
267 ) -> groups_pb2.ListDiscussionsRes:
268 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
269 next_page_id = int(request.page_token) if request.page_token else 0
270 cluster = session.execute(
271 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
272 ).scalar_one_or_none()
273 if not cluster: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
275 discussions = (
276 cluster.owned_discussions.where(Discussion.id >= next_page_id)
277 .order_by(Discussion.id)
278 .limit(page_size + 1)
279 .all()
280 )
281 return groups_pb2.ListDiscussionsRes(
282 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
283 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
284 )
286 def JoinGroup(
287 self, request: groups_pb2.JoinGroupReq, context: CouchersContext, session: Session
288 ) -> empty_pb2.Empty:
289 cluster = session.execute(
290 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
291 ).scalar_one_or_none()
292 if not cluster: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
295 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
296 if user_in_group:
297 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group")
299 cluster.cluster_subscriptions.append(
300 ClusterSubscription(
301 user_id=context.user_id,
302 cluster_id=cluster.id,
303 role=ClusterRole.member,
304 )
305 )
307 log_event(context, session, "group.joined", {"group_id": cluster.id, "group_name": cluster.name})
309 return empty_pb2.Empty()
311 def LeaveGroup(
312 self, request: groups_pb2.LeaveGroupReq, context: CouchersContext, session: Session
313 ) -> empty_pb2.Empty:
314 cluster = session.execute(
315 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
316 ).scalar_one_or_none()
317 if not cluster: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
320 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
321 if not user_in_group:
322 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group")
324 session.execute(
325 delete(ClusterSubscription)
326 .where(ClusterSubscription.cluster_id == request.group_id)
327 .where(ClusterSubscription.user_id == context.user_id)
328 )
330 log_event(context, session, "group.left", {"group_id": cluster.id, "group_name": cluster.name})
332 return empty_pb2.Empty()
334 def ListUserGroups(
335 self, request: groups_pb2.ListUserGroupsReq, context: CouchersContext, session: Session
336 ) -> groups_pb2.ListUserGroupsRes:
337 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
338 next_cluster_id = int(request.page_token) if request.page_token else 0
339 user_id = request.user_id or context.user_id
340 clusters = (
341 session.execute(
342 select(Cluster)
343 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
344 .where(ClusterSubscription.user_id == user_id)
345 .where(~Cluster.is_official_cluster) # not an official group
346 .where(Cluster.id >= next_cluster_id)
347 .order_by(Cluster.id)
348 .limit(page_size + 1)
349 )
350 .scalars()
351 .all()
352 )
353 return groups_pb2.ListUserGroupsRes(
354 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
355 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
356 )