Coverage for src/couchers/servicers/communities.py: 83%

171 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-09 00:05 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.crypto import decrypt_page_token, encrypt_page_token 

10from couchers.db import can_moderate_node, get_node_parents_recursively 

11from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Discussion, 

17 Event, 

18 EventOccurrence, 

19 Node, 

20 Page, 

21 PageType, 

22 User, 

23) 

24from couchers.servicers.discussions import discussion_to_pb 

25from couchers.servicers.events import event_to_pb 

26from couchers.servicers.groups import group_to_pb 

27from couchers.servicers.pages import page_to_pb 

28from couchers.sql import couchers_select as select 

29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

30from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

31 

32logger = logging.getLogger(__name__) 

33 

34MAX_PAGINATION_LENGTH = 25 

35 

36 

37def _parents_to_pb(session, node_id): 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def communities_to_pb(session, nodes: list[Node], context): 

53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

54 

55 official_clusters = [node.official_cluster for node in nodes] 

56 official_cluster_ids = [cluster.id for cluster in official_clusters] 

57 

58 member_counts = dict( 

59 session.execute( 

60 select(cluster_subscription_counts.c.cluster_id, cluster_subscription_counts.c.count).where( 

61 cluster_subscription_counts.c.cluster_id.in_(official_cluster_ids) 

62 ) 

63 ).all() 

64 ) 

65 cluster_memberships = set( 

66 session.execute( 

67 select(ClusterSubscription.cluster_id) 

68 .where(ClusterSubscription.user_id == context.user_id) 

69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

70 ) 

71 .scalars() 

72 .all() 

73 ) 

74 

75 admin_counts = dict( 

76 session.execute( 

77 select(cluster_admin_counts.c.cluster_id, cluster_admin_counts.c.count).where( 

78 cluster_admin_counts.c.cluster_id.in_(official_cluster_ids) 

79 ) 

80 ).all() 

81 ) 

82 cluster_adminships = set( 

83 session.execute( 

84 select(ClusterSubscription.cluster_id) 

85 .where(ClusterSubscription.user_id == context.user_id) 

86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

87 .where(ClusterSubscription.role == ClusterRole.admin) 

88 ) 

89 .scalars() 

90 .all() 

91 ) 

92 

93 return [ 

94 communities_pb2.Community( 

95 community_id=node.id, 

96 name=official_cluster.name, 

97 slug=official_cluster.slug, 

98 description=official_cluster.description, 

99 created=Timestamp_from_datetime(node.created), 

100 parents=_parents_to_pb(session, node.id), 

101 member=official_cluster.id in cluster_memberships, 

102 admin=official_cluster.id in cluster_adminships, 

103 member_count=member_counts.get(official_cluster.id, 1), 

104 admin_count=admin_counts.get(official_cluster.id, 1), 

105 main_page=page_to_pb(session, official_cluster.main_page, context), 

106 can_moderate=can_moderate, 

107 ) 

108 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

109 ] 

110 

111 

112def community_to_pb(session, node: Node, context): 

113 return communities_to_pb(session, [node], context)[0] 

114 

115 

116class Communities(communities_pb2_grpc.CommunitiesServicer): 

117 def GetCommunity(self, request, context, session): 

118 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

119 if not node: 

120 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

121 

122 return community_to_pb(session, node, context) 

123 

124 def ListCommunities(self, request, context, session): 

125 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

126 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

127 nodes = ( 

128 session.execute( 

129 select(Node) 

130 .join(Cluster, Cluster.parent_node_id == Node.id) 

131 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

132 .where(Cluster.is_official_cluster) 

133 .order_by(Cluster.name) 

134 .limit(page_size + 1) 

135 .offset(offset) 

136 ) 

137 .scalars() 

138 .all() 

139 ) 

140 return communities_pb2.ListCommunitiesRes( 

141 communities=communities_to_pb(session, nodes[:page_size], context), 

142 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

143 ) 

144 

145 def ListGroups(self, request, context, session): 

146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

147 next_cluster_id = int(request.page_token) if request.page_token else 0 

148 clusters = ( 

149 session.execute( 

150 select(Cluster) 

151 .where(~Cluster.is_official_cluster) # not an official group 

152 .where(Cluster.parent_node_id == request.community_id) 

153 .where(Cluster.id >= next_cluster_id) 

154 .order_by(Cluster.id) 

155 .limit(page_size + 1) 

156 ) 

157 .scalars() 

158 .all() 

159 ) 

160 return communities_pb2.ListGroupsRes( 

161 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

162 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

163 ) 

164 

165 def ListAdmins(self, request, context, session): 

166 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

167 next_admin_id = int(request.page_token) if request.page_token else 0 

168 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

169 if not node: 

170 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

171 admins = ( 

172 session.execute( 

173 select(User) 

174 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

175 .where_users_visible(context) 

176 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

177 .where(ClusterSubscription.role == ClusterRole.admin) 

178 .where(User.id >= next_admin_id) 

179 .order_by(User.id) 

180 .limit(page_size + 1) 

181 ) 

182 .scalars() 

183 .all() 

184 ) 

185 return communities_pb2.ListAdminsRes( 

186 admin_user_ids=[admin.id for admin in admins[:page_size]], 

187 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

188 ) 

189 

190 def AddAdmin(self, request, context, session): 

191 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

192 if not node: 

193 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

194 if not can_moderate_node(session, context.user_id, node.id): 

195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

196 

197 user = session.execute( 

198 select(User).where_users_visible(context).where(User.id == request.user_id) 

199 ).scalar_one_or_none() 

200 if not user: 

201 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

202 

203 subscription = session.execute( 

204 select(ClusterSubscription) 

205 .where(ClusterSubscription.user_id == user.id) 

206 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

207 ).scalar_one_or_none() 

208 if not subscription: 

209 # Can't upgrade a member to admin if they're not already a member 

210 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

211 if subscription.role == ClusterRole.admin: 

212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

213 

214 subscription.role = ClusterRole.admin 

215 

216 return empty_pb2.Empty() 

217 

218 def RemoveAdmin(self, request, context, session): 

219 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

220 if not node: 

221 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

222 if not can_moderate_node(session, context.user_id, node.id): 

223 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

224 

225 user = session.execute( 

226 select(User).where_users_visible(context).where(User.id == request.user_id) 

227 ).scalar_one_or_none() 

228 if not user: 

229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

230 

231 subscription = session.execute( 

232 select(ClusterSubscription) 

233 .where(ClusterSubscription.user_id == user.id) 

234 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

235 ).scalar_one_or_none() 

236 if not subscription: 

237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

238 if subscription.role == ClusterRole.member: 

239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

240 

241 subscription.role = ClusterRole.member 

242 

243 return empty_pb2.Empty() 

244 

245 def ListMembers(self, request, context, session): 

246 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

247 next_member_id = int(request.page_token) if request.page_token else None 

248 

249 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

250 if not node: 

251 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

252 

253 query = ( 

254 select(User) 

255 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

256 .where_users_visible(context) 

257 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

258 ) 

259 if next_member_id is not None: 

260 query = query.where(User.id <= next_member_id) 

261 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

262 

263 return communities_pb2.ListMembersRes( 

264 member_user_ids=[member.id for member in members[:page_size]], 

265 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

266 ) 

267 

268 def ListNearbyUsers(self, request, context, session): 

269 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

270 next_nearby_id = int(request.page_token) if request.page_token else 0 

271 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

272 if not node: 

273 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

274 nearbys = ( 

275 session.execute( 

276 select(User) 

277 .where_users_visible(context) 

278 .where(func.ST_Contains(node.geom, User.geom)) 

279 .where(User.id >= next_nearby_id) 

280 .order_by(User.id) 

281 .limit(page_size + 1) 

282 ) 

283 .scalars() 

284 .all() 

285 ) 

286 return communities_pb2.ListNearbyUsersRes( 

287 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

288 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

289 ) 

290 

291 def ListPlaces(self, request, context, session): 

292 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

293 next_page_id = int(request.page_token) if request.page_token else 0 

294 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

295 if not node: 

296 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

297 places = ( 

298 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

299 .where(Page.id >= next_page_id) 

300 .order_by(Page.id) 

301 .limit(page_size + 1) 

302 .all() 

303 ) 

304 return communities_pb2.ListPlacesRes( 

305 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

306 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

307 ) 

308 

309 def ListGuides(self, request, context, session): 

310 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

311 next_page_id = int(request.page_token) if request.page_token else 0 

312 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

313 if not node: 

314 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

315 guides = ( 

316 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

317 .where(Page.id >= next_page_id) 

318 .order_by(Page.id) 

319 .limit(page_size + 1) 

320 .all() 

321 ) 

322 return communities_pb2.ListGuidesRes( 

323 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

324 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

325 ) 

326 

327 def ListEvents(self, request, context, session): 

328 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

329 # the page token is a unix timestamp of where we left off 

330 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

331 

332 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

333 if not node: 

334 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

335 

336 # for communities, we list events owned by this community or for which this is a parent 

337 occurrences = ( 

338 select(EventOccurrence) 

339 .join(Event, Event.id == EventOccurrence.event_id) 

340 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

341 ) 

342 

343 if request.past: 

344 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

345 EventOccurrence.start_time.desc() 

346 ) 

347 else: 

348 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

349 EventOccurrence.start_time.asc() 

350 ) 

351 

352 occurrences = occurrences.limit(page_size + 1) 

353 occurrences = session.execute(occurrences).scalars().all() 

354 

355 return communities_pb2.ListEventsRes( 

356 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

357 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

358 ) 

359 

360 def ListDiscussions(self, request, context, session): 

361 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

362 next_page_id = int(request.page_token) if request.page_token else 0 

363 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

364 if not node: 

365 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

366 discussions = ( 

367 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

368 .order_by(Discussion.id.desc()) 

369 .limit(page_size + 1) 

370 .all() 

371 ) 

372 return communities_pb2.ListDiscussionsRes( 

373 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

374 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

375 ) 

376 

377 def JoinCommunity(self, request, context, session): 

378 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

379 if not node: 

380 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

381 

382 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

383 if current_membership: 

384 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

385 

386 node.official_cluster.cluster_subscriptions.append( 

387 ClusterSubscription( 

388 user_id=context.user_id, 

389 role=ClusterRole.member, 

390 ) 

391 ) 

392 

393 return empty_pb2.Empty() 

394 

395 def LeaveCommunity(self, request, context, session): 

396 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

397 if not node: 

398 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

399 

400 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

401 

402 if not current_membership: 

403 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

404 

405 if context.user_id in node.contained_user_ids: 

406 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

407 

408 session.execute( 

409 delete(ClusterSubscription) 

410 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

411 .where(ClusterSubscription.user_id == context.user_id) 

412 ) 

413 

414 return empty_pb2.Empty() 

415 

416 def ListUserCommunities(self, request, context, session): 

417 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

418 next_node_id = int(request.page_token) if request.page_token else 0 

419 user_id = request.user_id or context.user_id 

420 nodes = ( 

421 session.execute( 

422 select(Node) 

423 .join(Cluster, Cluster.parent_node_id == Node.id) 

424 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

425 .where(ClusterSubscription.user_id == user_id) 

426 .where(Cluster.is_official_cluster) 

427 .where(Node.id >= next_node_id) 

428 .order_by(Node.id) 

429 .limit(page_size + 1) 

430 ) 

431 .scalars() 

432 .all() 

433 ) 

434 

435 return communities_pb2.ListUserCommunitiesRes( 

436 communities=communities_to_pb(session, nodes[:page_size], context), 

437 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

438 )