Coverage for app / backend / src / couchers / servicers / groups.py: 74%

113 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import select 

7from sqlalchemy.orm import Session 

8from sqlalchemy.sql import delete, func 

9 

10from couchers.context import CouchersContext 

11from couchers.db import can_moderate_node, get_node_parents_recursively 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Discussion, 

17 Event, 

18 EventOccurrence, 

19 Page, 

20 PageType, 

21 User, 

22) 

23from couchers.proto import groups_pb2, groups_pb2_grpc 

24from couchers.servicers.discussions import discussion_to_pb 

25from couchers.servicers.events import event_to_pb 

26from couchers.servicers.pages import page_to_pb 

27from couchers.sql import users_visible, where_users_column_visible 

28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

29 

30logger = logging.getLogger(__name__) 

31 

32MAX_PAGINATION_LENGTH = 25 

33 

34 

35def _parents_to_pb(session: Session, cluster: Cluster) -> list[groups_pb2.Parent]: 

36 parents = get_node_parents_recursively(session, cluster.parent_node_id) 

37 return [ 

38 groups_pb2.Parent( 

39 community=groups_pb2.CommunityParent( 

40 community_id=node_id, 

41 name=cluster.name, 

42 slug=cluster.slug, 

43 description=cluster.description, 

44 ) 

45 ) 

46 for node_id, parent_node_id, level, cluster in parents 

47 ] + [ 

48 groups_pb2.Parent( 

49 group=groups_pb2.GroupParent( 

50 group_id=cluster.id, 

51 name=cluster.name, 

52 slug=cluster.slug, 

53 description=cluster.description, 

54 ) 

55 ) 

56 ] 

57 

58 

59def group_to_pb(session: Session, cluster: Cluster, context: CouchersContext) -> groups_pb2.Group: 

60 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id) 

61 

62 member_count = session.execute( 

63 where_users_column_visible( 

64 select(func.count()).select_from(ClusterSubscription).where(ClusterSubscription.cluster_id == cluster.id), 

65 context, 

66 ClusterSubscription.user_id, 

67 ) 

68 ).scalar_one() 

69 is_member = ( 

70 session.execute( 

71 select(ClusterSubscription) 

72 .where(ClusterSubscription.user_id == context.user_id) 

73 .where(ClusterSubscription.cluster_id == cluster.id) 

74 ).scalar_one_or_none() 

75 is not None 

76 ) 

77 

78 admin_count = session.execute( 

79 where_users_column_visible( 

80 select(func.count()) 

81 .select_from(ClusterSubscription) 

82 .where(ClusterSubscription.cluster_id == cluster.id) 

83 .where(ClusterSubscription.role == ClusterRole.admin), 

84 context, 

85 ClusterSubscription.user_id, 

86 ) 

87 ).scalar_one() 

88 is_admin = ( 

89 session.execute( 

90 select(ClusterSubscription) 

91 .where(ClusterSubscription.user_id == context.user_id) 

92 .where(ClusterSubscription.cluster_id == cluster.id) 

93 .where(ClusterSubscription.role == ClusterRole.admin) 

94 ).scalar_one_or_none() 

95 is not None 

96 ) 

97 

98 return groups_pb2.Group( 

99 group_id=cluster.id, 

100 name=cluster.name, 

101 slug=cluster.slug, 

102 description=cluster.description, 

103 created=Timestamp_from_datetime(cluster.created), 

104 parents=_parents_to_pb(session, cluster), 

105 main_page=page_to_pb(session, cluster.main_page, context), 

106 member=is_member, 

107 admin=is_admin, 

108 member_count=member_count, 

109 admin_count=admin_count, 

110 can_moderate=can_moderate, 

111 ) 

112 

113 

114class Groups(groups_pb2_grpc.GroupsServicer): 

115 def GetGroup(self, request: groups_pb2.GetGroupReq, context: CouchersContext, session: Session) -> groups_pb2.Group: 

116 cluster = session.execute( 

117 select(Cluster) 

118 .where(~Cluster.is_official_cluster) # not an official group 

119 .where(Cluster.id == request.group_id) 

120 ).scalar_one_or_none() 

121 if not cluster: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

123 

124 return group_to_pb(session, cluster, context) 

125 

126 def ListAdmins( 

127 self, request: groups_pb2.ListAdminsReq, context: CouchersContext, session: Session 

128 ) -> groups_pb2.ListAdminsRes: 

129 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

130 next_admin_id = int(request.page_token) if request.page_token else 0 

131 cluster = session.execute( 

132 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

133 ).scalar_one_or_none() 

134 if not cluster: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

136 

137 admins = ( 

138 session.execute( 

139 select(User) 

140 .where(users_visible(context)) 

141 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

142 .where(ClusterSubscription.cluster_id == cluster.id) 

143 .where(ClusterSubscription.role == ClusterRole.admin) 

144 .where(User.id >= next_admin_id) 

145 .order_by(User.id) 

146 .limit(page_size + 1) 

147 ) 

148 .scalars() 

149 .all() 

150 ) 

151 return groups_pb2.ListAdminsRes( 

152 admin_user_ids=[admin.id for admin in admins[:page_size]], 

153 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

154 ) 

155 

156 def ListMembers( 

157 self, request: groups_pb2.ListMembersReq, context: CouchersContext, session: Session 

158 ) -> groups_pb2.ListMembersRes: 

159 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

160 next_member_id = int(request.page_token) if request.page_token else 0 

161 cluster = session.execute( 

162 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

163 ).scalar_one_or_none() 

164 if not cluster: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

166 

167 members = ( 

168 session.execute( 

169 select(User) 

170 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

171 .where(users_visible(context)) 

172 .where(ClusterSubscription.cluster_id == cluster.id) 

173 .where(User.id >= next_member_id) 

174 .order_by(User.id) 

175 .limit(page_size + 1) 

176 ) 

177 .scalars() 

178 .all() 

179 ) 

180 return groups_pb2.ListMembersRes( 

181 member_user_ids=[member.id for member in members[:page_size]], 

182 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

183 ) 

184 

185 def ListPlaces( 

186 self, request: groups_pb2.ListPlacesReq, context: CouchersContext, session: Session 

187 ) -> groups_pb2.ListPlacesRes: 

188 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

189 next_page_id = int(request.page_token) if request.page_token else 0 

190 cluster = session.execute( 

191 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

192 ).scalar_one_or_none() 

193 if not cluster: 

194 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

195 places = ( 

196 cluster.owned_pages.where(Page.type == PageType.place) 

197 .where(Page.id >= next_page_id) 

198 .order_by(Page.id) 

199 .limit(page_size + 1) 

200 .all() 

201 ) 

202 return groups_pb2.ListPlacesRes( 

203 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

204 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

205 ) 

206 

207 def ListGuides( 

208 self, request: groups_pb2.ListGuidesReq, context: CouchersContext, session: Session 

209 ) -> groups_pb2.ListGuidesRes: 

210 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

211 next_page_id = int(request.page_token) if request.page_token else 0 

212 cluster = session.execute( 

213 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

214 ).scalar_one_or_none() 

215 if not cluster: 

216 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

217 guides = ( 

218 cluster.owned_pages.where(Page.type == PageType.guide) 

219 .where(Page.id >= next_page_id) 

220 .order_by(Page.id) 

221 .limit(page_size + 1) 

222 .all() 

223 ) 

224 return groups_pb2.ListGuidesRes( 

225 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

226 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

227 ) 

228 

229 def ListEvents( 

230 self, request: groups_pb2.ListEventsReq, context: CouchersContext, session: Session 

231 ) -> groups_pb2.ListEventsRes: 

232 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

233 # the page token is a unix timestamp of where we left off 

234 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

235 

236 cluster = session.execute( 

237 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

238 ).scalar_one_or_none() 

239 if not cluster: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

241 

242 query = ( 

243 select(EventOccurrence) 

244 .join(Event, Event.id == EventOccurrence.event_id) 

245 .where(Event.owner_cluster == cluster) 

246 ) 

247 

248 if not request.past: 248 ↛ 252line 248 didn't jump to line 252 because the condition on line 248 was always true

249 cutoff = page_token - timedelta(seconds=1) 

250 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

251 else: 

252 cutoff = page_token + timedelta(seconds=1) 

253 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

254 

255 query = query.limit(page_size + 1) 

256 occurrences = session.execute(query).scalars().all() 

257 

258 return groups_pb2.ListEventsRes( 

259 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

260 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

261 ) 

262 

263 def ListDiscussions( 

264 self, request: groups_pb2.ListDiscussionsReq, context: CouchersContext, session: Session 

265 ) -> groups_pb2.ListDiscussionsRes: 

266 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

267 next_page_id = int(request.page_token) if request.page_token else 0 

268 cluster = session.execute( 

269 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

270 ).scalar_one_or_none() 

271 if not cluster: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true

272 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

273 discussions = ( 

274 cluster.owned_discussions.where(Discussion.id >= next_page_id) 

275 .order_by(Discussion.id) 

276 .limit(page_size + 1) 

277 .all() 

278 ) 

279 return groups_pb2.ListDiscussionsRes( 

280 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

281 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

282 ) 

283 

284 def JoinGroup( 

285 self, request: groups_pb2.JoinGroupReq, context: CouchersContext, session: Session 

286 ) -> empty_pb2.Empty: 

287 cluster = session.execute( 

288 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

289 ).scalar_one_or_none() 

290 if not cluster: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

292 

293 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

294 if user_in_group: 

295 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group") 

296 

297 cluster.cluster_subscriptions.append( 

298 ClusterSubscription( 

299 user_id=context.user_id, 

300 cluster_id=cluster.id, 

301 role=ClusterRole.member, 

302 ) 

303 ) 

304 

305 return empty_pb2.Empty() 

306 

307 def LeaveGroup( 

308 self, request: groups_pb2.LeaveGroupReq, context: CouchersContext, session: Session 

309 ) -> empty_pb2.Empty: 

310 cluster = session.execute( 

311 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

312 ).scalar_one_or_none() 

313 if not cluster: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

315 

316 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

317 if not user_in_group: 

318 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group") 

319 

320 session.execute( 

321 delete(ClusterSubscription) 

322 .where(ClusterSubscription.cluster_id == request.group_id) 

323 .where(ClusterSubscription.user_id == context.user_id) 

324 ) 

325 

326 return empty_pb2.Empty() 

327 

328 def ListUserGroups( 

329 self, request: groups_pb2.ListUserGroupsReq, context: CouchersContext, session: Session 

330 ) -> groups_pb2.ListUserGroupsRes: 

331 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

332 next_cluster_id = int(request.page_token) if request.page_token else 0 

333 user_id = request.user_id or context.user_id 

334 clusters = ( 

335 session.execute( 

336 select(Cluster) 

337 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

338 .where(ClusterSubscription.user_id == user_id) 

339 .where(~Cluster.is_official_cluster) # not an official group 

340 .where(Cluster.id >= next_cluster_id) 

341 .order_by(Cluster.id) 

342 .limit(page_size + 1) 

343 ) 

344 .scalars() 

345 .all() 

346 ) 

347 return groups_pb2.ListUserGroupsRes( 

348 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

349 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

350 )