Coverage for src/couchers/servicers/groups.py: 80%
108 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy.sql import delete, func
8from couchers.db import can_moderate_node, get_node_parents_recursively
9from couchers.models import (
10 Cluster,
11 ClusterRole,
12 ClusterSubscription,
13 Discussion,
14 Event,
15 EventOccurrence,
16 Page,
17 PageType,
18 User,
19)
20from couchers.proto import groups_pb2, groups_pb2_grpc
21from couchers.servicers.discussions import discussion_to_pb
22from couchers.servicers.events import event_to_pb
23from couchers.servicers.pages import page_to_pb
24from couchers.sql import couchers_select as select
25from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now
27logger = logging.getLogger(__name__)
29MAX_PAGINATION_LENGTH = 25
32def _parents_to_pb(session, cluster: Cluster):
33 parents = get_node_parents_recursively(session, cluster.parent_node_id)
34 return [
35 groups_pb2.Parent(
36 community=groups_pb2.CommunityParent(
37 community_id=node_id,
38 name=cluster.name,
39 slug=cluster.slug,
40 description=cluster.description,
41 )
42 )
43 for node_id, parent_node_id, level, cluster in parents
44 ] + [
45 groups_pb2.Parent(
46 group=groups_pb2.GroupParent(
47 group_id=cluster.id,
48 name=cluster.name,
49 slug=cluster.slug,
50 description=cluster.description,
51 )
52 )
53 ]
56def group_to_pb(session, cluster: Cluster, context):
57 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id)
59 member_count = session.execute(
60 select(func.count())
61 .select_from(ClusterSubscription)
62 .where_users_column_visible(context, ClusterSubscription.user_id)
63 .where(ClusterSubscription.cluster_id == cluster.id)
64 ).scalar_one()
65 is_member = (
66 session.execute(
67 select(ClusterSubscription)
68 .where(ClusterSubscription.user_id == context.user_id)
69 .where(ClusterSubscription.cluster_id == cluster.id)
70 ).scalar_one_or_none()
71 is not None
72 )
74 admin_count = session.execute(
75 select(func.count())
76 .select_from(ClusterSubscription)
77 .where_users_column_visible(context, ClusterSubscription.user_id)
78 .where(ClusterSubscription.cluster_id == cluster.id)
79 .where(ClusterSubscription.role == ClusterRole.admin)
80 ).scalar_one()
81 is_admin = (
82 session.execute(
83 select(ClusterSubscription)
84 .where(ClusterSubscription.user_id == context.user_id)
85 .where(ClusterSubscription.cluster_id == cluster.id)
86 .where(ClusterSubscription.role == ClusterRole.admin)
87 ).scalar_one_or_none()
88 is not None
89 )
91 return groups_pb2.Group(
92 group_id=cluster.id,
93 name=cluster.name,
94 slug=cluster.slug,
95 description=cluster.description,
96 created=Timestamp_from_datetime(cluster.created),
97 parents=_parents_to_pb(session, cluster),
98 main_page=page_to_pb(session, cluster.main_page, context),
99 member=is_member,
100 admin=is_admin,
101 member_count=member_count,
102 admin_count=admin_count,
103 can_moderate=can_moderate,
104 )
107class Groups(groups_pb2_grpc.GroupsServicer):
108 def GetGroup(self, request, context, session):
109 cluster = session.execute(
110 select(Cluster)
111 .where(~Cluster.is_official_cluster) # not an official group
112 .where(Cluster.id == request.group_id)
113 ).scalar_one_or_none()
114 if not cluster:
115 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
117 return group_to_pb(session, cluster, context)
119 def ListAdmins(self, request, context, session):
120 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
121 next_admin_id = int(request.page_token) if request.page_token else 0
122 cluster = session.execute(
123 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
124 ).scalar_one_or_none()
125 if not cluster:
126 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
128 admins = (
129 session.execute(
130 select(User)
131 .where_users_visible(context)
132 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
133 .where(ClusterSubscription.cluster_id == cluster.id)
134 .where(ClusterSubscription.role == ClusterRole.admin)
135 .where(User.id >= next_admin_id)
136 .order_by(User.id)
137 .limit(page_size + 1)
138 )
139 .scalars()
140 .all()
141 )
142 return groups_pb2.ListAdminsRes(
143 admin_user_ids=[admin.id for admin in admins[:page_size]],
144 next_page_token=str(admins[-1].id) if len(admins) > page_size else None,
145 )
147 def ListMembers(self, request, context, session):
148 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
149 next_member_id = int(request.page_token) if request.page_token else 0
150 cluster = session.execute(
151 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
152 ).scalar_one_or_none()
153 if not cluster:
154 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
156 members = (
157 session.execute(
158 select(User)
159 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
160 .where_users_visible(context)
161 .where(ClusterSubscription.cluster_id == cluster.id)
162 .where(User.id >= next_member_id)
163 .order_by(User.id)
164 .limit(page_size + 1)
165 )
166 .scalars()
167 .all()
168 )
169 return groups_pb2.ListMembersRes(
170 member_user_ids=[member.id for member in members[:page_size]],
171 next_page_token=str(members[-1].id) if len(members) > page_size else None,
172 )
174 def ListPlaces(self, request, context, session):
175 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
176 next_page_id = int(request.page_token) if request.page_token else 0
177 cluster = session.execute(
178 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
179 ).scalar_one_or_none()
180 if not cluster:
181 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
182 places = (
183 cluster.owned_pages.where(Page.type == PageType.place)
184 .where(Page.id >= next_page_id)
185 .order_by(Page.id)
186 .limit(page_size + 1)
187 .all()
188 )
189 return groups_pb2.ListPlacesRes(
190 places=[page_to_pb(session, page, context) for page in places[:page_size]],
191 next_page_token=str(places[-1].id) if len(places) > page_size else None,
192 )
194 def ListGuides(self, request, context, session):
195 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
196 next_page_id = int(request.page_token) if request.page_token else 0
197 cluster = session.execute(
198 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
199 ).scalar_one_or_none()
200 if not cluster:
201 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
202 guides = (
203 cluster.owned_pages.where(Page.type == PageType.guide)
204 .where(Page.id >= next_page_id)
205 .order_by(Page.id)
206 .limit(page_size + 1)
207 .all()
208 )
209 return groups_pb2.ListGuidesRes(
210 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
211 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
212 )
214 def ListEvents(self, request, context, session):
215 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
216 # the page token is a unix timestamp of where we left off
217 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
219 cluster = session.execute(
220 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
221 ).scalar_one_or_none()
222 if not cluster:
223 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
225 occurrences = (
226 select(EventOccurrence)
227 .join(Event, Event.id == EventOccurrence.event_id)
228 .where(Event.owner_cluster == cluster)
229 )
231 if not request.past:
232 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
233 EventOccurrence.start_time.asc()
234 )
235 else:
236 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
237 EventOccurrence.start_time.desc()
238 )
240 occurrences = occurrences.limit(page_size + 1)
241 occurrences = session.execute(occurrences).scalars().all()
243 return groups_pb2.ListEventsRes(
244 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
245 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
246 )
248 def ListDiscussions(self, request, context, session):
249 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
250 next_page_id = int(request.page_token) if request.page_token else 0
251 cluster = session.execute(
252 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
253 ).scalar_one_or_none()
254 if not cluster:
255 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
256 discussions = (
257 cluster.owned_discussions.where(Discussion.id >= next_page_id)
258 .order_by(Discussion.id)
259 .limit(page_size + 1)
260 .all()
261 )
262 return groups_pb2.ListDiscussionsRes(
263 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]],
264 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None,
265 )
267 def JoinGroup(self, request, context, session):
268 cluster = session.execute(
269 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
270 ).scalar_one_or_none()
271 if not cluster:
272 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
274 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
275 if user_in_group:
276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group")
278 cluster.cluster_subscriptions.append(
279 ClusterSubscription(
280 user_id=context.user_id,
281 role=ClusterRole.member,
282 )
283 )
285 return empty_pb2.Empty()
287 def LeaveGroup(self, request, context, session):
288 cluster = session.execute(
289 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id)
290 ).scalar_one_or_none()
291 if not cluster:
292 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found")
294 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none()
295 if not user_in_group:
296 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group")
298 session.execute(
299 delete(ClusterSubscription)
300 .where(ClusterSubscription.cluster_id == request.group_id)
301 .where(ClusterSubscription.user_id == context.user_id)
302 )
304 return empty_pb2.Empty()
306 def ListUserGroups(self, request, context, session):
307 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
308 next_cluster_id = int(request.page_token) if request.page_token else 0
309 user_id = request.user_id or context.user_id
310 clusters = (
311 session.execute(
312 select(Cluster)
313 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
314 .where(ClusterSubscription.user_id == user_id)
315 .where(~Cluster.is_official_cluster) # not an official group
316 .where(Cluster.id >= next_cluster_id)
317 .order_by(Cluster.id)
318 .limit(page_size + 1)
319 )
320 .scalars()
321 .all()
322 )
323 return groups_pb2.ListUserGroupsRes(
324 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]],
325 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None,
326 )