Coverage for src/couchers/servicers/communities.py: 82%

182 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-09-14 15:31 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.crypto import decrypt_page_token, encrypt_page_token 

10from couchers.db import can_moderate_node, get_node_parents_recursively 

11from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Discussion, 

17 Event, 

18 EventOccurrence, 

19 Node, 

20 Page, 

21 PageType, 

22 User, 

23) 

24from couchers.servicers.discussions import discussion_to_pb 

25from couchers.servicers.events import event_to_pb 

26from couchers.servicers.groups import group_to_pb 

27from couchers.servicers.pages import page_to_pb 

28from couchers.sql import couchers_select as select 

29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

30from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

31 

32logger = logging.getLogger(__name__) 

33 

34MAX_PAGINATION_LENGTH = 25 

35 

36 

37def _parents_to_pb(session, node_id): 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def communities_to_pb(session, nodes: list[Node], context): 

53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

54 

55 official_clusters = [node.official_cluster for node in nodes] 

56 official_cluster_ids = [cluster.id for cluster in official_clusters] 

57 

58 member_counts = dict( 

59 session.execute( 

60 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

61 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

62 ) 

63 ).all() 

64 ) 

65 cluster_memberships = set( 

66 session.execute( 

67 select(ClusterSubscription.cluster_id) 

68 .where(ClusterSubscription.user_id == context.user_id) 

69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

70 ) 

71 .scalars() 

72 .all() 

73 ) 

74 

75 admin_counts = dict( 

76 session.execute( 

77 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

78 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

79 ) 

80 ).all() 

81 ) 

82 cluster_adminships = set( 

83 session.execute( 

84 select(ClusterSubscription.cluster_id) 

85 .where(ClusterSubscription.user_id == context.user_id) 

86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

87 .where(ClusterSubscription.role == ClusterRole.admin) 

88 ) 

89 .scalars() 

90 .all() 

91 ) 

92 

93 return [ 

94 communities_pb2.Community( 

95 community_id=node.id, 

96 name=official_cluster.name, 

97 slug=official_cluster.slug, 

98 description=official_cluster.description, 

99 created=Timestamp_from_datetime(node.created), 

100 parents=_parents_to_pb(session, node.id), 

101 member=official_cluster.id in cluster_memberships, 

102 admin=official_cluster.id in cluster_adminships, 

103 member_count=member_counts.get(official_cluster.id, 1), 

104 admin_count=admin_counts.get(official_cluster.id, 1), 

105 main_page=page_to_pb(session, official_cluster.main_page, context), 

106 can_moderate=can_moderate, 

107 discussions_enabled=official_cluster.discussions_enabled, 

108 events_enabled=official_cluster.events_enabled, 

109 ) 

110 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

111 ] 

112 

113 

114def community_to_pb(session, node: Node, context): 

115 return communities_to_pb(session, [node], context)[0] 

116 

117 

118class Communities(communities_pb2_grpc.CommunitiesServicer): 

119 def GetCommunity(self, request, context, session): 

120 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

121 if not node: 

122 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

123 

124 return community_to_pb(session, node, context) 

125 

126 def ListCommunities(self, request, context, session): 

127 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

128 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

129 nodes = ( 

130 session.execute( 

131 select(Node) 

132 .join(Cluster, Cluster.parent_node_id == Node.id) 

133 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

134 .where(Cluster.is_official_cluster) 

135 .order_by(Cluster.name) 

136 .limit(page_size + 1) 

137 .offset(offset) 

138 ) 

139 .scalars() 

140 .all() 

141 ) 

142 return communities_pb2.ListCommunitiesRes( 

143 communities=communities_to_pb(session, nodes[:page_size], context), 

144 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

145 ) 

146 

147 def ListGroups(self, request, context, session): 

148 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

149 next_cluster_id = int(request.page_token) if request.page_token else 0 

150 clusters = ( 

151 session.execute( 

152 select(Cluster) 

153 .where(~Cluster.is_official_cluster) # not an official group 

154 .where(Cluster.parent_node_id == request.community_id) 

155 .where(Cluster.id >= next_cluster_id) 

156 .order_by(Cluster.id) 

157 .limit(page_size + 1) 

158 ) 

159 .scalars() 

160 .all() 

161 ) 

162 return communities_pb2.ListGroupsRes( 

163 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

164 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

165 ) 

166 

167 def ListAdmins(self, request, context, session): 

168 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

169 next_admin_id = int(request.page_token) if request.page_token else 0 

170 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

171 if not node: 

172 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

173 admins = ( 

174 session.execute( 

175 select(User) 

176 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

177 .where_users_visible(context) 

178 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

179 .where(ClusterSubscription.role == ClusterRole.admin) 

180 .where(User.id >= next_admin_id) 

181 .order_by(User.id) 

182 .limit(page_size + 1) 

183 ) 

184 .scalars() 

185 .all() 

186 ) 

187 return communities_pb2.ListAdminsRes( 

188 admin_user_ids=[admin.id for admin in admins[:page_size]], 

189 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

190 ) 

191 

192 def AddAdmin(self, request, context, session): 

193 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

194 if not node: 

195 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

196 if not can_moderate_node(session, context.user_id, node.id): 

197 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

198 

199 user = session.execute( 

200 select(User).where_users_visible(context).where(User.id == request.user_id) 

201 ).scalar_one_or_none() 

202 if not user: 

203 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

204 

205 subscription = session.execute( 

206 select(ClusterSubscription) 

207 .where(ClusterSubscription.user_id == user.id) 

208 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

209 ).scalar_one_or_none() 

210 if not subscription: 

211 # Can't upgrade a member to admin if they're not already a member 

212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

213 if subscription.role == ClusterRole.admin: 

214 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

215 

216 subscription.role = ClusterRole.admin 

217 

218 return empty_pb2.Empty() 

219 

220 def RemoveAdmin(self, request, context, session): 

221 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

222 if not node: 

223 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

224 if not can_moderate_node(session, context.user_id, node.id): 

225 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

226 

227 user = session.execute( 

228 select(User).where_users_visible(context).where(User.id == request.user_id) 

229 ).scalar_one_or_none() 

230 if not user: 

231 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

232 

233 subscription = session.execute( 

234 select(ClusterSubscription) 

235 .where(ClusterSubscription.user_id == user.id) 

236 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

237 ).scalar_one_or_none() 

238 if not subscription: 

239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

240 if subscription.role == ClusterRole.member: 

241 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

242 

243 subscription.role = ClusterRole.member 

244 

245 return empty_pb2.Empty() 

246 

247 def ListMembers(self, request, context, session): 

248 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

249 next_member_id = int(request.page_token) if request.page_token else None 

250 

251 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

252 if not node: 

253 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

254 

255 query = ( 

256 select(User) 

257 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

258 .where_users_visible(context) 

259 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

260 ) 

261 if next_member_id is not None: 

262 query = query.where(User.id <= next_member_id) 

263 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

264 

265 return communities_pb2.ListMembersRes( 

266 member_user_ids=[member.id for member in members[:page_size]], 

267 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

268 ) 

269 

270 def ListNearbyUsers(self, request, context, session): 

271 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

272 next_nearby_id = int(request.page_token) if request.page_token else 0 

273 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

274 if not node: 

275 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

276 nearbys = ( 

277 session.execute( 

278 select(User) 

279 .where_users_visible(context) 

280 .where(func.ST_Contains(node.geom, User.geom)) 

281 .where(User.id >= next_nearby_id) 

282 .order_by(User.id) 

283 .limit(page_size + 1) 

284 ) 

285 .scalars() 

286 .all() 

287 ) 

288 return communities_pb2.ListNearbyUsersRes( 

289 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

290 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

291 ) 

292 

293 def ListPlaces(self, request, context, session): 

294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

295 next_page_id = int(request.page_token) if request.page_token else 0 

296 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

297 if not node: 

298 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

299 places = ( 

300 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

301 .where(Page.id >= next_page_id) 

302 .order_by(Page.id) 

303 .limit(page_size + 1) 

304 .all() 

305 ) 

306 return communities_pb2.ListPlacesRes( 

307 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

308 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

309 ) 

310 

311 def ListGuides(self, request, context, session): 

312 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

313 next_page_id = int(request.page_token) if request.page_token else 0 

314 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

315 if not node: 

316 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

317 guides = ( 

318 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

319 .where(Page.id >= next_page_id) 

320 .order_by(Page.id) 

321 .limit(page_size + 1) 

322 .all() 

323 ) 

324 return communities_pb2.ListGuidesRes( 

325 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

326 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

327 ) 

328 

329 def ListEvents(self, request, context, session): 

330 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

331 # the page token is a unix timestamp of where we left off 

332 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

333 

334 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

335 if not node: 

336 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

337 if not node.official_cluster.events_enabled: 

338 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED) 

339 

340 if not request.include_parents: 

341 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

342 else: 

343 # the first value is the node_id, the last is the cluster (object) 

344 nodes_clusters_to_search = [ 

345 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

346 ] 

347 

348 membership_clauses = [] 

349 for node_id, official_cluster_obj in nodes_clusters_to_search: 

350 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

351 membership_clauses.append(Event.parent_node_id == node_id) 

352 

353 # for communities, we list events owned by this community or for which this is a parent 

354 occurrences = ( 

355 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

356 ) 

357 

358 if request.past: 

359 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

360 EventOccurrence.start_time.desc() 

361 ) 

362 else: 

363 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

364 EventOccurrence.start_time.asc() 

365 ) 

366 

367 occurrences = occurrences.limit(page_size + 1) 

368 occurrences = session.execute(occurrences).scalars().all() 

369 

370 return communities_pb2.ListEventsRes( 

371 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

372 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

373 ) 

374 

375 def ListDiscussions(self, request, context, session): 

376 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

377 next_page_id = int(request.page_token) if request.page_token else 0 

378 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

379 if not node: 

380 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

381 if not node.official_cluster.discussions_enabled: 

382 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.DISCUSSIONS_NOT_ENABLED) 

383 discussions = ( 

384 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

385 .order_by(Discussion.id.desc()) 

386 .limit(page_size + 1) 

387 .all() 

388 ) 

389 return communities_pb2.ListDiscussionsRes( 

390 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

391 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

392 ) 

393 

394 def JoinCommunity(self, request, context, session): 

395 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

396 if not node: 

397 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

398 

399 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

400 if current_membership: 

401 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

402 

403 node.official_cluster.cluster_subscriptions.append( 

404 ClusterSubscription( 

405 user_id=context.user_id, 

406 role=ClusterRole.member, 

407 ) 

408 ) 

409 

410 return empty_pb2.Empty() 

411 

412 def LeaveCommunity(self, request, context, session): 

413 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

414 if not node: 

415 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

416 

417 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

418 

419 if not current_membership: 

420 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

421 

422 if context.user_id in node.contained_user_ids: 

423 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

424 

425 session.execute( 

426 delete(ClusterSubscription) 

427 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

428 .where(ClusterSubscription.user_id == context.user_id) 

429 ) 

430 

431 return empty_pb2.Empty() 

432 

433 def ListUserCommunities(self, request, context, session): 

434 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

435 next_node_id = int(request.page_token) if request.page_token else 0 

436 user_id = request.user_id or context.user_id 

437 nodes = ( 

438 session.execute( 

439 select(Node) 

440 .join(Cluster, Cluster.parent_node_id == Node.id) 

441 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

442 .where(ClusterSubscription.user_id == user_id) 

443 .where(Cluster.is_official_cluster) 

444 .where(Node.id >= next_node_id) 

445 .order_by(Node.id) 

446 .limit(page_size + 1) 

447 ) 

448 .scalars() 

449 .all() 

450 ) 

451 

452 return communities_pb2.ListUserCommunitiesRes( 

453 communities=communities_to_pb(session, nodes[:page_size], context), 

454 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

455 )