Coverage for app / backend / src / couchers / servicers / groups.py: 75%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import select 

7from sqlalchemy.orm import Session 

8from sqlalchemy.sql import delete, func 

9 

10from couchers.context import CouchersContext 

11from couchers.db import can_moderate_node, get_node_parents_recursively 

12from couchers.event_log import log_event 

13from couchers.models import ( 

14 Cluster, 

15 ClusterRole, 

16 ClusterSubscription, 

17 Discussion, 

18 Event, 

19 EventOccurrence, 

20 Page, 

21 PageType, 

22 User, 

23) 

24from couchers.proto import groups_pb2, groups_pb2_grpc 

25from couchers.servicers.discussions import discussion_to_pb 

26from couchers.servicers.events import event_to_pb 

27from couchers.servicers.pages import page_to_pb 

28from couchers.sql import users_visible, where_moderated_content_visible, where_users_column_visible 

29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

30 

31logger = logging.getLogger(__name__) 

32 

33MAX_PAGINATION_LENGTH = 25 

34 

35 

36def _parents_to_pb(session: Session, cluster: Cluster) -> list[groups_pb2.Parent]: 

37 parents = get_node_parents_recursively(session, cluster.parent_node_id) 

38 return [ 

39 groups_pb2.Parent( 

40 community=groups_pb2.CommunityParent( 

41 community_id=node_id, 

42 name=cluster.name, 

43 slug=cluster.slug, 

44 description=cluster.description, 

45 ) 

46 ) 

47 for node_id, parent_node_id, level, cluster in parents 

48 ] + [ 

49 groups_pb2.Parent( 

50 group=groups_pb2.GroupParent( 

51 group_id=cluster.id, 

52 name=cluster.name, 

53 slug=cluster.slug, 

54 description=cluster.description, 

55 ) 

56 ) 

57 ] 

58 

59 

60def group_to_pb(session: Session, cluster: Cluster, context: CouchersContext) -> groups_pb2.Group: 

61 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id) 

62 

63 member_count = session.execute( 

64 where_users_column_visible( 

65 select(func.count()).select_from(ClusterSubscription).where(ClusterSubscription.cluster_id == cluster.id), 

66 context, 

67 ClusterSubscription.user_id, 

68 ) 

69 ).scalar_one() 

70 is_member = ( 

71 session.execute( 

72 select(ClusterSubscription) 

73 .where(ClusterSubscription.user_id == context.user_id) 

74 .where(ClusterSubscription.cluster_id == cluster.id) 

75 ).scalar_one_or_none() 

76 is not None 

77 ) 

78 

79 admin_count = session.execute( 

80 where_users_column_visible( 

81 select(func.count()) 

82 .select_from(ClusterSubscription) 

83 .where(ClusterSubscription.cluster_id == cluster.id) 

84 .where(ClusterSubscription.role == ClusterRole.admin), 

85 context, 

86 ClusterSubscription.user_id, 

87 ) 

88 ).scalar_one() 

89 is_admin = ( 

90 session.execute( 

91 select(ClusterSubscription) 

92 .where(ClusterSubscription.user_id == context.user_id) 

93 .where(ClusterSubscription.cluster_id == cluster.id) 

94 .where(ClusterSubscription.role == ClusterRole.admin) 

95 ).scalar_one_or_none() 

96 is not None 

97 ) 

98 

99 return groups_pb2.Group( 

100 group_id=cluster.id, 

101 name=cluster.name, 

102 slug=cluster.slug, 

103 description=cluster.description, 

104 created=Timestamp_from_datetime(cluster.created), 

105 parents=_parents_to_pb(session, cluster), 

106 main_page=page_to_pb(session, cluster.main_page, context), 

107 member=is_member, 

108 admin=is_admin, 

109 member_count=member_count, 

110 admin_count=admin_count, 

111 can_moderate=can_moderate, 

112 ) 

113 

114 

115class Groups(groups_pb2_grpc.GroupsServicer): 

116 def GetGroup(self, request: groups_pb2.GetGroupReq, context: CouchersContext, session: Session) -> groups_pb2.Group: 

117 cluster = session.execute( 

118 select(Cluster) 

119 .where(~Cluster.is_official_cluster) # not an official group 

120 .where(Cluster.id == request.group_id) 

121 ).scalar_one_or_none() 

122 if not cluster: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

124 

125 return group_to_pb(session, cluster, context) 

126 

127 def ListAdmins( 

128 self, request: groups_pb2.ListAdminsReq, context: CouchersContext, session: Session 

129 ) -> groups_pb2.ListAdminsRes: 

130 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

131 next_admin_id = int(request.page_token) if request.page_token else 0 

132 cluster = session.execute( 

133 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

134 ).scalar_one_or_none() 

135 if not cluster: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

137 

138 admins = ( 

139 session.execute( 

140 select(User) 

141 .where(users_visible(context)) 

142 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

143 .where(ClusterSubscription.cluster_id == cluster.id) 

144 .where(ClusterSubscription.role == ClusterRole.admin) 

145 .where(User.id >= next_admin_id) 

146 .order_by(User.id) 

147 .limit(page_size + 1) 

148 ) 

149 .scalars() 

150 .all() 

151 ) 

152 return groups_pb2.ListAdminsRes( 

153 admin_user_ids=[admin.id for admin in admins[:page_size]], 

154 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

155 ) 

156 

157 def ListMembers( 

158 self, request: groups_pb2.ListMembersReq, context: CouchersContext, session: Session 

159 ) -> groups_pb2.ListMembersRes: 

160 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

161 next_member_id = int(request.page_token) if request.page_token else 0 

162 cluster = session.execute( 

163 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

164 ).scalar_one_or_none() 

165 if not cluster: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

167 

168 members = ( 

169 session.execute( 

170 select(User) 

171 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

172 .where(users_visible(context)) 

173 .where(ClusterSubscription.cluster_id == cluster.id) 

174 .where(User.id >= next_member_id) 

175 .order_by(User.id) 

176 .limit(page_size + 1) 

177 ) 

178 .scalars() 

179 .all() 

180 ) 

181 return groups_pb2.ListMembersRes( 

182 member_user_ids=[member.id for member in members[:page_size]], 

183 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

184 ) 

185 

186 def ListPlaces( 

187 self, request: groups_pb2.ListPlacesReq, context: CouchersContext, session: Session 

188 ) -> groups_pb2.ListPlacesRes: 

189 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

190 next_page_id = int(request.page_token) if request.page_token else 0 

191 cluster = session.execute( 

192 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

193 ).scalar_one_or_none() 

194 if not cluster: 

195 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

196 places = ( 

197 cluster.owned_pages.where(Page.type == PageType.place) 

198 .where(Page.id >= next_page_id) 

199 .order_by(Page.id) 

200 .limit(page_size + 1) 

201 .all() 

202 ) 

203 return groups_pb2.ListPlacesRes( 

204 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

205 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

206 ) 

207 

208 def ListGuides( 

209 self, request: groups_pb2.ListGuidesReq, context: CouchersContext, session: Session 

210 ) -> groups_pb2.ListGuidesRes: 

211 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

212 next_page_id = int(request.page_token) if request.page_token else 0 

213 cluster = session.execute( 

214 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

215 ).scalar_one_or_none() 

216 if not cluster: 

217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

218 guides = ( 

219 cluster.owned_pages.where(Page.type == PageType.guide) 

220 .where(Page.id >= next_page_id) 

221 .order_by(Page.id) 

222 .limit(page_size + 1) 

223 .all() 

224 ) 

225 return groups_pb2.ListGuidesRes( 

226 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

227 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

228 ) 

229 

230 def ListEvents( 

231 self, request: groups_pb2.ListEventsReq, context: CouchersContext, session: Session 

232 ) -> groups_pb2.ListEventsRes: 

233 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

234 # the page token is a unix timestamp of where we left off 

235 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

236 

237 cluster = session.execute( 

238 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

239 ).scalar_one_or_none() 

240 if not cluster: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

242 

243 query = ( 

244 select(EventOccurrence) 

245 .join(Event, Event.id == EventOccurrence.event_id) 

246 .where(Event.owner_cluster == cluster) 

247 ) 

248 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

249 

250 if not request.past: 250 ↛ 254line 250 didn't jump to line 254 because the condition on line 250 was always true

251 cutoff = page_token - timedelta(seconds=1) 

252 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

253 else: 

254 cutoff = page_token + timedelta(seconds=1) 

255 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

256 

257 query = query.limit(page_size + 1) 

258 occurrences = session.execute(query).scalars().all() 

259 

260 return groups_pb2.ListEventsRes( 

261 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

262 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

263 ) 

264 

265 def ListDiscussions( 

266 self, request: groups_pb2.ListDiscussionsReq, context: CouchersContext, session: Session 

267 ) -> groups_pb2.ListDiscussionsRes: 

268 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

269 next_page_id = int(request.page_token) if request.page_token else 0 

270 cluster = session.execute( 

271 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

272 ).scalar_one_or_none() 

273 if not cluster: 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true

274 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

275 discussions = ( 

276 cluster.owned_discussions.where(Discussion.id >= next_page_id) 

277 .order_by(Discussion.id) 

278 .limit(page_size + 1) 

279 .all() 

280 ) 

281 return groups_pb2.ListDiscussionsRes( 

282 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

283 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

284 ) 

285 

286 def JoinGroup( 

287 self, request: groups_pb2.JoinGroupReq, context: CouchersContext, session: Session 

288 ) -> empty_pb2.Empty: 

289 cluster = session.execute( 

290 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

291 ).scalar_one_or_none() 

292 if not cluster: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

294 

295 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

296 if user_in_group: 

297 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group") 

298 

299 cluster.cluster_subscriptions.append( 

300 ClusterSubscription( 

301 user_id=context.user_id, 

302 cluster_id=cluster.id, 

303 role=ClusterRole.member, 

304 ) 

305 ) 

306 

307 log_event(context, session, "group.joined", {"group_id": cluster.id, "group_name": cluster.name}) 

308 

309 return empty_pb2.Empty() 

310 

311 def LeaveGroup( 

312 self, request: groups_pb2.LeaveGroupReq, context: CouchersContext, session: Session 

313 ) -> empty_pb2.Empty: 

314 cluster = session.execute( 

315 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

316 ).scalar_one_or_none() 

317 if not cluster: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

319 

320 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

321 if not user_in_group: 

322 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group") 

323 

324 session.execute( 

325 delete(ClusterSubscription) 

326 .where(ClusterSubscription.cluster_id == request.group_id) 

327 .where(ClusterSubscription.user_id == context.user_id) 

328 ) 

329 

330 log_event(context, session, "group.left", {"group_id": cluster.id, "group_name": cluster.name}) 

331 

332 return empty_pb2.Empty() 

333 

334 def ListUserGroups( 

335 self, request: groups_pb2.ListUserGroupsReq, context: CouchersContext, session: Session 

336 ) -> groups_pb2.ListUserGroupsRes: 

337 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

338 next_cluster_id = int(request.page_token) if request.page_token else 0 

339 user_id = request.user_id or context.user_id 

340 clusters = ( 

341 session.execute( 

342 select(Cluster) 

343 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

344 .where(ClusterSubscription.user_id == user_id) 

345 .where(~Cluster.is_official_cluster) # not an official group 

346 .where(Cluster.id >= next_cluster_id) 

347 .order_by(Cluster.id) 

348 .limit(page_size + 1) 

349 ) 

350 .scalars() 

351 .all() 

352 ) 

353 return groups_pb2.ListUserGroupsRes( 

354 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

355 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

356 )