Coverage for app / backend / src / couchers / servicers / groups.py: 74%
113 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import select
7from sqlalchemy.orm import Session
8from sqlalchemy.sql import delete, func
10from couchers.context import CouchersContext
11from couchers.db import can_moderate_node, get_node_parents_recursively
12from couchers.models import (
13 Cluster,
14 ClusterRole,
15 ClusterSubscription,
16 Discussion,
17 Event,
18 EventOccurrence,
19 Page,
20 PageType,
21 User,
22)
23from couchers.proto import groups_pb2, groups_pb2_grpc
24from couchers.servicers.discussions import discussion_to_pb
25from couchers.servicers.events import event_to_pb
26from couchers.servicers.pages import page_to_pb
27from couchers.sql import users_visible, where_users_column_visible
28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
30logger = logging.getLogger(__name__)
32MAX_PAGINATION_LENGTH = 25
35def _parents_to_pb(session: Session, cluster: Cluster) -> list[groups_pb2.Parent]:
36 parents = get_node_parents_recursively(session, cluster.parent_node_id)
37 return [
38 groups_pb2.Parent(
39 community=groups_pb2.CommunityParent(
40 community_id=node_id,
41 name=cluster.name,
42 slug=cluster.slug,
43 description=cluster.description,
44 )
45 )
46 for node_id, parent_node_id, level, cluster in parents
47 ] + [
48 groups_pb2.Parent(
49 group=groups_pb2.GroupParent(
50 group_id=cluster.id,
51 name=cluster.name,
52 slug=cluster.slug,
53 description=cluster.description,
54 )
55 )
56 ]
59def group_to_pb(session: Session, cluster: Cluster, context: CouchersContext) -> groups_pb2.Group:
60 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id)
62 member_count = session.execute(
63 where_users_column_visible(
64 select(func.count()).select_from(ClusterSubscription).where(ClusterSubscription.cluster_id == cluster.id),
65 context,
66 ClusterSubscription.user_id,
67 )
68 ).scalar_one()
69 is_member = (
70 session.execute(
71 select(ClusterSubscription)
72 .where(ClusterSubscription.user_id == context.user_id)
73 .where(ClusterSubscription.cluster_id == cluster.id)
74 ).scalar_one_or_none()
75 is not None
76 )
78 admin_count = session.execute(
79 where_users_column_visible(
80 select(func.count())
81 .select_from(ClusterSubscription)
82 .where(ClusterSubscription.cluster_id == cluster.id)
83 .where(ClusterSubscription.role == ClusterRole.admin),
84 context,
85 ClusterSubscription.user_id,
86 )
87 ).scalar_one()
88 is_admin = (
89 session.execute(
90 select(ClusterSubscription)
91 .where(ClusterSubscription.user_id == context.user_id)
92 .where(ClusterSubscription.cluster_id == cluster.id)
93 .where(ClusterSubscription.role == ClusterRole.admin)
94 ).scalar_one_or_none()
95 is not None
96 )
98 return groups_pb2.Group(
99 group_id=cluster.id,
100 name=cluster.name,
101 slug=cluster.slug,
102 description=cluster.description,
103 created=Timestamp_from_datetime(cluster.created),
104 parents=_parents_to_pb(session, cluster),
105 main_page=page_to_pb(session, cluster.main_page, context),
106 member=is_member,
107 admin=is_admin,
108 member_count=member_count,
109 admin_count=admin_count,
110 can_moderate=can_moderate,
111 )
114class Groups(groups_pb2_grpc.GroupsServicer):
115 def GetGroup(self, request: groups_pb2.GetGroupReq, context: CouchersContext, session: Session) -> groups_pb2.Group:
116 cluster = session.execute(
117 select(Cluster)
118 .where(~Cluster.is_official_cluster) # not an official group
119 .where(Cluster.id == request.group_id)
120 ).scalar_one_or_none()
121 if not cluster: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
124 return group_to_pb(session, cluster, context)
126 def ListAdmins(
127 self, request: groups_pb2.ListAdminsReq, context: CouchersContext, session: Session
128 ) -> groups_pb2.ListAdminsRes:
129 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
130 next_admin_id = int(request.page_token) if request.page_token else 0
131 cluster = session.execute(
132 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
133 ).scalar_one_or_none()
134 if not cluster: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
137 admins = (
138 session.execute(
139 select(User)
140 .where(users_visible(context))
141 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
142 .where(ClusterSubscription.cluster_id == cluster.id)
143 .where(ClusterSubscription.role == ClusterRole.admin)
144 .where(User.id >= next_admin_id)
145 .order_by(User.id)
146 .limit(page_size + 1)
147 )
148 .scalars()
149 .all()
150 )
151 return groups_pb2.ListAdminsRes(
152 admin_user_ids=[admin.id for admin in admins[:page_size]],
153 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
154 )
156 def ListMembers(
157 self, request: groups_pb2.ListMembersReq, context: CouchersContext, session: Session
158 ) -> groups_pb2.ListMembersRes:
159 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
160 next_member_id = int(request.page_token) if request.page_token else 0
161 cluster = session.execute(
162 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
163 ).scalar_one_or_none()
164 if not cluster: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
167 members = (
168 session.execute(
169 select(User)
170 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
171 .where(users_visible(context))
172 .where(ClusterSubscription.cluster_id == cluster.id)
173 .where(User.id >= next_member_id)
174 .order_by(User.id)
175 .limit(page_size + 1)
176 )
177 .scalars()
178 .all()
179 )
180 return groups_pb2.ListMembersRes(
181 member_user_ids=[member.id for member in members[:page_size]],
182 next_page_token=str(members[-1].id) if len(members) > page_size else None,
183 )
185 def ListPlaces(
186 self, request: groups_pb2.ListPlacesReq, context: CouchersContext, session: Session
187 ) -> groups_pb2.ListPlacesRes:
188 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
189 next_page_id = int(request.page_token) if request.page_token else 0
190 cluster = session.execute(
191 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
192 ).scalar_one_or_none()
193 if not cluster:
194 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
195 places = (
196 cluster.owned_pages.where(Page.type == PageType.place)
197 .where(Page.id >= next_page_id)
198 .order_by(Page.id)
199 .limit(page_size + 1)
200 .all()
201 )
202 return groups_pb2.ListPlacesRes(
203 places=[page_to_pb(session, page, context) for page in places[:page_size]],
204 next_page_token=str(places[-1].id) if len(places) > page_size else None,
205 )
207 def ListGuides(
208 self, request: groups_pb2.ListGuidesReq, context: CouchersContext, session: Session
209 ) -> groups_pb2.ListGuidesRes:
210 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
211 next_page_id = int(request.page_token) if request.page_token else 0
212 cluster = session.execute(
213 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
214 ).scalar_one_or_none()
215 if not cluster:
216 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
217 guides = (
218 cluster.owned_pages.where(Page.type == PageType.guide)
219 .where(Page.id >= next_page_id)
220 .order_by(Page.id)
221 .limit(page_size + 1)
222 .all()
223 )
224 return groups_pb2.ListGuidesRes(
225 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
226 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
227 )
229 def ListEvents(
230 self, request: groups_pb2.ListEventsReq, context: CouchersContext, session: Session
231 ) -> groups_pb2.ListEventsRes:
232 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
233 # the page token is a unix timestamp of where we left off
234 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
236 cluster = session.execute(
237 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
238 ).scalar_one_or_none()
239 if not cluster: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
242 query = (
243 select(EventOccurrence)
244 .join(Event, Event.id == EventOccurrence.event_id)
245 .where(Event.owner_cluster == cluster)
246 )
248 if not request.past: 248 ↛ 252line 248 didn't jump to line 252 because the condition on line 248 was always true
249 cutoff = page_token - timedelta(seconds=1)
250 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc())
251 else:
252 cutoff = page_token + timedelta(seconds=1)
253 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc())
255 query = query.limit(page_size + 1)
256 occurrences = session.execute(query).scalars().all()
258 return groups_pb2.ListEventsRes(
259 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
260 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
261 )
263 def ListDiscussions(
264 self, request: groups_pb2.ListDiscussionsReq, context: CouchersContext, session: Session
265 ) -> groups_pb2.ListDiscussionsRes:
266 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
267 next_page_id = int(request.page_token) if request.page_token else 0
268 cluster = session.execute(
269 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
270 ).scalar_one_or_none()
271 if not cluster: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true
272 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
273 discussions = (
274 cluster.owned_discussions.where(Discussion.id >= next_page_id)
275 .order_by(Discussion.id)
276 .limit(page_size + 1)
277 .all()
278 )
279 return groups_pb2.ListDiscussionsRes(
280 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
281 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
282 )
284 def JoinGroup(
285 self, request: groups_pb2.JoinGroupReq, context: CouchersContext, session: Session
286 ) -> empty_pb2.Empty:
287 cluster = session.execute(
288 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
289 ).scalar_one_or_none()
290 if not cluster: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
293 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
294 if user_in_group:
295 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group")
297 cluster.cluster_subscriptions.append(
298 ClusterSubscription(
299 user_id=context.user_id,
300 cluster_id=cluster.id,
301 role=ClusterRole.member,
302 )
303 )
305 return empty_pb2.Empty()
307 def LeaveGroup(
308 self, request: groups_pb2.LeaveGroupReq, context: CouchersContext, session: Session
309 ) -> empty_pb2.Empty:
310 cluster = session.execute(
311 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
312 ).scalar_one_or_none()
313 if not cluster: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
316 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
317 if not user_in_group:
318 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group")
320 session.execute(
321 delete(ClusterSubscription)
322 .where(ClusterSubscription.cluster_id == request.group_id)
323 .where(ClusterSubscription.user_id == context.user_id)
324 )
326 return empty_pb2.Empty()
328 def ListUserGroups(
329 self, request: groups_pb2.ListUserGroupsReq, context: CouchersContext, session: Session
330 ) -> groups_pb2.ListUserGroupsRes:
331 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
332 next_cluster_id = int(request.page_token) if request.page_token else 0
333 user_id = request.user_id or context.user_id
334 clusters = (
335 session.execute(
336 select(Cluster)
337 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
338 .where(ClusterSubscription.user_id == user_id)
339 .where(~Cluster.is_official_cluster) # not an official group
340 .where(Cluster.id >= next_cluster_id)
341 .order_by(Cluster.id)
342 .limit(page_size + 1)
343 )
344 .scalars()
345 .all()
346 )
347 return groups_pb2.ListUserGroupsRes(
348 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
349 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
350 )