Coverage for src/couchers/servicers/communities.py: 83%

168 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-06-01 02:39 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.crypto import decrypt_page_token, encrypt_page_token 

10from couchers.db import can_moderate_node, get_node_parents_recursively 

11from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Discussion, 

17 Event, 

18 EventOccurrence, 

19 Node, 

20 Page, 

21 PageType, 

22 User, 

23) 

24from couchers.servicers.discussions import discussion_to_pb 

25from couchers.servicers.events import event_to_pb 

26from couchers.servicers.groups import group_to_pb 

27from couchers.servicers.pages import page_to_pb 

28from couchers.sql import couchers_select as select 

29from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

30from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

31 

32logger = logging.getLogger(__name__) 

33 

34MAX_PAGINATION_LENGTH = 25 

35 

36 

37def _parents_to_pb(session, node_id): 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def communities_to_pb(session, nodes: list[Node], context): 

53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

54 

55 official_clusters = [node.official_cluster for node in nodes] 

56 official_cluster_ids = [cluster.id for cluster in official_clusters] 

57 

58 member_counts = dict( 

59 session.execute( 

60 select(cluster_subscription_counts.c.cluster_id, cluster_subscription_counts.c.count).where( 

61 cluster_subscription_counts.c.cluster_id.in_(official_cluster_ids) 

62 ) 

63 ).all() 

64 ) 

65 cluster_memberships = set( 

66 session.execute( 

67 select(ClusterSubscription.cluster_id) 

68 .where(ClusterSubscription.user_id == context.user_id) 

69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

70 ) 

71 .scalars() 

72 .all() 

73 ) 

74 

75 admin_counts = dict( 

76 session.execute( 

77 select(cluster_admin_counts.c.cluster_id, cluster_admin_counts.c.count).where( 

78 cluster_admin_counts.c.cluster_id.in_(official_cluster_ids) 

79 ) 

80 ).all() 

81 ) 

82 cluster_adminships = set( 

83 session.execute( 

84 select(ClusterSubscription.cluster_id) 

85 .where(ClusterSubscription.user_id == context.user_id) 

86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

87 .where(ClusterSubscription.role == ClusterRole.admin) 

88 ) 

89 .scalars() 

90 .all() 

91 ) 

92 

93 return [ 

94 communities_pb2.Community( 

95 community_id=node.id, 

96 name=official_cluster.name, 

97 slug=official_cluster.slug, 

98 description=official_cluster.description, 

99 created=Timestamp_from_datetime(node.created), 

100 parents=_parents_to_pb(session, node.id), 

101 member=official_cluster.id in cluster_memberships, 

102 admin=official_cluster.id in cluster_adminships, 

103 member_count=member_counts.get(official_cluster.id, 1), 

104 admin_count=admin_counts.get(official_cluster.id, 1), 

105 main_page=page_to_pb(session, official_cluster.main_page, context), 

106 can_moderate=can_moderate, 

107 ) 

108 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

109 ] 

110 

111 

112def community_to_pb(session, node: Node, context): 

113 return communities_to_pb(session, [node], context)[0] 

114 

115 

116class Communities(communities_pb2_grpc.CommunitiesServicer): 

117 def GetCommunity(self, request, context, session): 

118 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

119 if not node: 

120 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

121 

122 return community_to_pb(session, node, context) 

123 

124 def ListCommunities(self, request, context, session): 

125 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

126 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

127 nodes = ( 

128 session.execute( 

129 select(Node) 

130 .join(Cluster, Cluster.parent_node_id == Node.id) 

131 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

132 .where(Cluster.is_official_cluster) 

133 .order_by(Cluster.name) 

134 .limit(page_size + 1) 

135 .offset(offset) 

136 ) 

137 .scalars() 

138 .all() 

139 ) 

140 return communities_pb2.ListCommunitiesRes( 

141 communities=communities_to_pb(session, nodes[:page_size], context), 

142 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

143 ) 

144 

145 def ListGroups(self, request, context, session): 

146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

147 next_cluster_id = int(request.page_token) if request.page_token else 0 

148 clusters = ( 

149 session.execute( 

150 select(Cluster) 

151 .where(~Cluster.is_official_cluster) # not an official group 

152 .where(Cluster.parent_node_id == request.community_id) 

153 .where(Cluster.id >= next_cluster_id) 

154 .order_by(Cluster.id) 

155 .limit(page_size + 1) 

156 ) 

157 .scalars() 

158 .all() 

159 ) 

160 return communities_pb2.ListGroupsRes( 

161 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

162 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

163 ) 

164 

165 def ListAdmins(self, request, context, session): 

166 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

167 next_admin_id = int(request.page_token) if request.page_token else 0 

168 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

169 if not node: 

170 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

171 admins = ( 

172 session.execute( 

173 select(User) 

174 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

175 .where_users_visible(context) 

176 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

177 .where(ClusterSubscription.role == ClusterRole.admin) 

178 .where(User.id >= next_admin_id) 

179 .order_by(User.id) 

180 .limit(page_size + 1) 

181 ) 

182 .scalars() 

183 .all() 

184 ) 

185 return communities_pb2.ListAdminsRes( 

186 admin_user_ids=[admin.id for admin in admins[:page_size]], 

187 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

188 ) 

189 

190 def AddAdmin(self, request, context, session): 

191 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

192 if not node: 

193 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

194 if not can_moderate_node(session, context.user_id, node.id): 

195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

196 

197 user = session.execute( 

198 select(User).where_users_visible(context).where(User.id == request.user_id) 

199 ).scalar_one_or_none() 

200 if not user: 

201 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

202 

203 subscription = session.execute( 

204 select(ClusterSubscription) 

205 .where(ClusterSubscription.user_id == user.id) 

206 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

207 ).scalar_one_or_none() 

208 if not subscription: 

209 # Can't upgrade a member to admin if they're not already a member 

210 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

211 if subscription.role == ClusterRole.admin: 

212 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

213 

214 subscription.role = ClusterRole.admin 

215 

216 return empty_pb2.Empty() 

217 

218 def RemoveAdmin(self, request, context, session): 

219 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

220 if not node: 

221 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

222 if not can_moderate_node(session, context.user_id, node.id): 

223 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

224 

225 user = session.execute( 

226 select(User).where_users_visible(context).where(User.id == request.user_id) 

227 ).scalar_one_or_none() 

228 if not user: 

229 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

230 

231 subscription = session.execute( 

232 select(ClusterSubscription) 

233 .where(ClusterSubscription.user_id == user.id) 

234 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

235 ).scalar_one_or_none() 

236 if not subscription: 

237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

238 if subscription.role == ClusterRole.member: 

239 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

240 

241 subscription.role = ClusterRole.member 

242 

243 return empty_pb2.Empty() 

244 

245 def ListMembers(self, request, context, session): 

246 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

247 next_member_id = int(request.page_token) if request.page_token else 0 

248 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

249 if not node: 

250 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

251 members = ( 

252 session.execute( 

253 select(User) 

254 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

255 .where_users_visible(context) 

256 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

257 .where(User.id >= next_member_id) 

258 .order_by(User.id) 

259 .limit(page_size + 1) 

260 ) 

261 .scalars() 

262 .all() 

263 ) 

264 return communities_pb2.ListMembersRes( 

265 member_user_ids=[member.id for member in members[:page_size]], 

266 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

267 ) 

268 

269 def ListNearbyUsers(self, request, context, session): 

270 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

271 next_nearby_id = int(request.page_token) if request.page_token else 0 

272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

273 if not node: 

274 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

275 nearbys = ( 

276 session.execute( 

277 select(User) 

278 .where_users_visible(context) 

279 .where(func.ST_Contains(node.geom, User.geom)) 

280 .where(User.id >= next_nearby_id) 

281 .order_by(User.id) 

282 .limit(page_size + 1) 

283 ) 

284 .scalars() 

285 .all() 

286 ) 

287 return communities_pb2.ListNearbyUsersRes( 

288 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

289 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

290 ) 

291 

292 def ListPlaces(self, request, context, session): 

293 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

294 next_page_id = int(request.page_token) if request.page_token else 0 

295 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

296 if not node: 

297 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

298 places = ( 

299 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

300 .where(Page.id >= next_page_id) 

301 .order_by(Page.id) 

302 .limit(page_size + 1) 

303 .all() 

304 ) 

305 return communities_pb2.ListPlacesRes( 

306 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

307 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

308 ) 

309 

310 def ListGuides(self, request, context, session): 

311 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

312 next_page_id = int(request.page_token) if request.page_token else 0 

313 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

314 if not node: 

315 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

316 guides = ( 

317 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

318 .where(Page.id >= next_page_id) 

319 .order_by(Page.id) 

320 .limit(page_size + 1) 

321 .all() 

322 ) 

323 return communities_pb2.ListGuidesRes( 

324 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

325 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

326 ) 

327 

328 def ListEvents(self, request, context, session): 

329 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

330 # the page token is a unix timestamp of where we left off 

331 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

332 

333 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

334 if not node: 

335 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

336 

337 # for communities, we list events owned by this community or for which this is a parent 

338 occurrences = ( 

339 select(EventOccurrence) 

340 .join(Event, Event.id == EventOccurrence.event_id) 

341 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

342 ) 

343 

344 if request.past: 

345 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

346 EventOccurrence.start_time.desc() 

347 ) 

348 else: 

349 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

350 EventOccurrence.start_time.asc() 

351 ) 

352 

353 occurrences = occurrences.limit(page_size + 1) 

354 occurrences = session.execute(occurrences).scalars().all() 

355 

356 return communities_pb2.ListEventsRes( 

357 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

358 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

359 ) 

360 

361 def ListDiscussions(self, request, context, session): 

362 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

363 next_page_id = int(request.page_token) if request.page_token else 0 

364 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

365 if not node: 

366 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

367 discussions = ( 

368 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

369 .order_by(Discussion.id.desc()) 

370 .limit(page_size + 1) 

371 .all() 

372 ) 

373 return communities_pb2.ListDiscussionsRes( 

374 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

375 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

376 ) 

377 

378 def JoinCommunity(self, request, context, session): 

379 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

380 if not node: 

381 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

382 

383 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

384 if current_membership: 

385 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

386 

387 node.official_cluster.cluster_subscriptions.append( 

388 ClusterSubscription( 

389 user_id=context.user_id, 

390 role=ClusterRole.member, 

391 ) 

392 ) 

393 

394 return empty_pb2.Empty() 

395 

396 def LeaveCommunity(self, request, context, session): 

397 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

398 if not node: 

399 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

400 

401 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

402 

403 if not current_membership: 

404 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

405 

406 if context.user_id in node.contained_user_ids: 

407 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

408 

409 session.execute( 

410 delete(ClusterSubscription) 

411 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

412 .where(ClusterSubscription.user_id == context.user_id) 

413 ) 

414 

415 return empty_pb2.Empty() 

416 

417 def ListUserCommunities(self, request, context, session): 

418 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

419 next_node_id = int(request.page_token) if request.page_token else 0 

420 user_id = request.user_id or context.user_id 

421 nodes = ( 

422 session.execute( 

423 select(Node) 

424 .join(Cluster, Cluster.parent_node_id == Node.id) 

425 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

426 .where(ClusterSubscription.user_id == user_id) 

427 .where(Cluster.is_official_cluster) 

428 .where(Node.id >= next_node_id) 

429 .order_by(Node.id) 

430 .limit(page_size + 1) 

431 ) 

432 .scalars() 

433 .all() 

434 ) 

435 

436 return communities_pb2.ListUserCommunitiesRes( 

437 communities=communities_to_pb(session, nodes[:page_size], context), 

438 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

439 )