Coverage for src/couchers/servicers/communities.py: 83%

163 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-22 06:42 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.db import can_moderate_node, get_node_parents_recursively 

10from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts 

11from couchers.models import ( 

12 Cluster, 

13 ClusterRole, 

14 ClusterSubscription, 

15 Discussion, 

16 Event, 

17 EventOccurrence, 

18 Node, 

19 Page, 

20 PageType, 

21 User, 

22) 

23from couchers.servicers.discussions import discussion_to_pb 

24from couchers.servicers.events import event_to_pb 

25from couchers.servicers.groups import group_to_pb 

26from couchers.servicers.pages import page_to_pb 

27from couchers.sql import couchers_select as select 

28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

29from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

30 

31logger = logging.getLogger(__name__) 

32 

33MAX_PAGINATION_LENGTH = 25 

34 

35 

36def _parents_to_pb(session, node_id): 

37 parents = get_node_parents_recursively(session, node_id) 

38 return [ 

39 groups_pb2.Parent( 

40 community=groups_pb2.CommunityParent( 

41 community_id=node_id, 

42 name=cluster.name, 

43 slug=cluster.slug, 

44 description=cluster.description, 

45 ) 

46 ) 

47 for node_id, parent_node_id, level, cluster in parents 

48 ] 

49 

50 

51def community_to_pb(session, node: Node, context): 

52 can_moderate = can_moderate_node(session, context.user_id, node.id) 

53 

54 member_count = ( 

55 session.execute( 

56 select(cluster_subscription_counts.c.count).where( 

57 cluster_subscription_counts.c.cluster_id == node.official_cluster.id 

58 ) 

59 ).scalar_one_or_none() 

60 or 1 

61 ) 

62 is_member = ( 

63 session.execute( 

64 select(ClusterSubscription) 

65 .where(ClusterSubscription.user_id == context.user_id) 

66 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

67 ).scalar_one_or_none() 

68 is not None 

69 ) 

70 

71 admin_count = ( 

72 session.execute( 

73 select(cluster_admin_counts.c.count).where(cluster_admin_counts.c.cluster_id == node.official_cluster.id) 

74 ).scalar_one_or_none() 

75 or 1 

76 ) 

77 is_admin = ( 

78 session.execute( 

79 select(ClusterSubscription) 

80 .where(ClusterSubscription.user_id == context.user_id) 

81 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

82 .where(ClusterSubscription.role == ClusterRole.admin) 

83 ).scalar_one_or_none() 

84 is not None 

85 ) 

86 

87 return communities_pb2.Community( 

88 community_id=node.id, 

89 name=node.official_cluster.name, 

90 slug=node.official_cluster.slug, 

91 description=node.official_cluster.description, 

92 created=Timestamp_from_datetime(node.created), 

93 parents=_parents_to_pb(session, node.id), 

94 member=is_member, 

95 admin=is_admin, 

96 member_count=member_count, 

97 admin_count=admin_count, 

98 main_page=page_to_pb(session, node.official_cluster.main_page, context), 

99 can_moderate=can_moderate, 

100 ) 

101 

102 

103class Communities(communities_pb2_grpc.CommunitiesServicer): 

104 def GetCommunity(self, request, context, session): 

105 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

106 if not node: 

107 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

108 

109 return community_to_pb(session, node, context) 

110 

111 def ListCommunities(self, request, context, session): 

112 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

113 next_node_id = int(request.page_token) if request.page_token else 0 

114 nodes = ( 

115 session.execute( 

116 select(Node) 

117 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

118 .where(Node.id >= next_node_id) 

119 .order_by(Node.id) 

120 .limit(page_size + 1) 

121 ) 

122 .scalars() 

123 .all() 

124 ) 

125 return communities_pb2.ListCommunitiesRes( 

126 communities=[community_to_pb(session, node, context) for node in nodes[:page_size]], 

127 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

128 ) 

129 

130 def ListGroups(self, request, context, session): 

131 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

132 next_cluster_id = int(request.page_token) if request.page_token else 0 

133 clusters = ( 

134 session.execute( 

135 select(Cluster) 

136 .where(~Cluster.is_official_cluster) # not an official group 

137 .where(Cluster.parent_node_id == request.community_id) 

138 .where(Cluster.id >= next_cluster_id) 

139 .order_by(Cluster.id) 

140 .limit(page_size + 1) 

141 ) 

142 .scalars() 

143 .all() 

144 ) 

145 return communities_pb2.ListGroupsRes( 

146 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

147 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

148 ) 

149 

150 def ListAdmins(self, request, context, session): 

151 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

152 next_admin_id = int(request.page_token) if request.page_token else 0 

153 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

154 if not node: 

155 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

156 admins = ( 

157 session.execute( 

158 select(User) 

159 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

160 .where_users_visible(context) 

161 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

162 .where(ClusterSubscription.role == ClusterRole.admin) 

163 .where(User.id >= next_admin_id) 

164 .order_by(User.id) 

165 .limit(page_size + 1) 

166 ) 

167 .scalars() 

168 .all() 

169 ) 

170 return communities_pb2.ListAdminsRes( 

171 admin_user_ids=[admin.id for admin in admins[:page_size]], 

172 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

173 ) 

174 

175 def AddAdmin(self, request, context, session): 

176 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

177 if not node: 

178 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

179 if not can_moderate_node(session, context.user_id, node.id): 

180 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

181 

182 user = session.execute( 

183 select(User).where_users_visible(context).where(User.id == request.user_id) 

184 ).scalar_one_or_none() 

185 if not user: 

186 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

187 

188 subscription = session.execute( 

189 select(ClusterSubscription) 

190 .where(ClusterSubscription.user_id == user.id) 

191 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

192 ).scalar_one_or_none() 

193 if not subscription: 

194 # Can't upgrade a member to admin if they're not already a member 

195 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

196 if subscription.role == ClusterRole.admin: 

197 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

198 

199 subscription.role = ClusterRole.admin 

200 

201 return empty_pb2.Empty() 

202 

203 def RemoveAdmin(self, request, context, session): 

204 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

205 if not node: 

206 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

207 if not can_moderate_node(session, context.user_id, node.id): 

208 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

209 

210 user = session.execute( 

211 select(User).where_users_visible(context).where(User.id == request.user_id) 

212 ).scalar_one_or_none() 

213 if not user: 

214 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

215 

216 subscription = session.execute( 

217 select(ClusterSubscription) 

218 .where(ClusterSubscription.user_id == user.id) 

219 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

220 ).scalar_one_or_none() 

221 if not subscription: 

222 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

223 if subscription.role == ClusterRole.member: 

224 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

225 

226 subscription.role = ClusterRole.member 

227 

228 return empty_pb2.Empty() 

229 

230 def ListMembers(self, request, context, session): 

231 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

232 next_member_id = int(request.page_token) if request.page_token else 0 

233 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

234 if not node: 

235 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

236 members = ( 

237 session.execute( 

238 select(User) 

239 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

240 .where_users_visible(context) 

241 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

242 .where(User.id >= next_member_id) 

243 .order_by(User.id) 

244 .limit(page_size + 1) 

245 ) 

246 .scalars() 

247 .all() 

248 ) 

249 return communities_pb2.ListMembersRes( 

250 member_user_ids=[member.id for member in members[:page_size]], 

251 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

252 ) 

253 

254 def ListNearbyUsers(self, request, context, session): 

255 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

256 next_nearby_id = int(request.page_token) if request.page_token else 0 

257 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

258 if not node: 

259 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

260 nearbys = ( 

261 session.execute( 

262 select(User) 

263 .where_users_visible(context) 

264 .where(func.ST_Contains(node.geom, User.geom)) 

265 .where(User.id >= next_nearby_id) 

266 .order_by(User.id) 

267 .limit(page_size + 1) 

268 ) 

269 .scalars() 

270 .all() 

271 ) 

272 return communities_pb2.ListNearbyUsersRes( 

273 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

274 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

275 ) 

276 

277 def ListPlaces(self, request, context, session): 

278 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

279 next_page_id = int(request.page_token) if request.page_token else 0 

280 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

281 if not node: 

282 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

283 places = ( 

284 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

285 .where(Page.id >= next_page_id) 

286 .order_by(Page.id) 

287 .limit(page_size + 1) 

288 .all() 

289 ) 

290 return communities_pb2.ListPlacesRes( 

291 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

292 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

293 ) 

294 

295 def ListGuides(self, request, context, session): 

296 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

297 next_page_id = int(request.page_token) if request.page_token else 0 

298 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

299 if not node: 

300 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

301 guides = ( 

302 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

303 .where(Page.id >= next_page_id) 

304 .order_by(Page.id) 

305 .limit(page_size + 1) 

306 .all() 

307 ) 

308 return communities_pb2.ListGuidesRes( 

309 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

310 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

311 ) 

312 

313 def ListEvents(self, request, context, session): 

314 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

315 # the page token is a unix timestamp of where we left off 

316 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

317 

318 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

319 if not node: 

320 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

321 

322 # for communities, we list events owned by this community or for which this is a parent 

323 occurrences = ( 

324 select(EventOccurrence) 

325 .join(Event, Event.id == EventOccurrence.event_id) 

326 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

327 ) 

328 

329 if request.past: 

330 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

331 EventOccurrence.start_time.desc() 

332 ) 

333 else: 

334 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

335 EventOccurrence.start_time.asc() 

336 ) 

337 

338 occurrences = occurrences.limit(page_size + 1) 

339 occurrences = session.execute(occurrences).scalars().all() 

340 

341 return communities_pb2.ListEventsRes( 

342 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

343 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

344 ) 

345 

346 def ListDiscussions(self, request, context, session): 

347 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

348 next_page_id = int(request.page_token) if request.page_token else 0 

349 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

350 if not node: 

351 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

352 discussions = ( 

353 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

354 .order_by(Discussion.id.desc()) 

355 .limit(page_size + 1) 

356 .all() 

357 ) 

358 return communities_pb2.ListDiscussionsRes( 

359 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

360 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

361 ) 

362 

363 def JoinCommunity(self, request, context, session): 

364 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

365 if not node: 

366 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

367 

368 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

369 if current_membership: 

370 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

371 

372 node.official_cluster.cluster_subscriptions.append( 

373 ClusterSubscription( 

374 user_id=context.user_id, 

375 role=ClusterRole.member, 

376 ) 

377 ) 

378 

379 return empty_pb2.Empty() 

380 

381 def LeaveCommunity(self, request, context, session): 

382 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

383 if not node: 

384 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

385 

386 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

387 

388 if not current_membership: 

389 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

390 

391 if context.user_id in node.contained_user_ids: 

392 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

393 

394 session.execute( 

395 delete(ClusterSubscription) 

396 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

397 .where(ClusterSubscription.user_id == context.user_id) 

398 ) 

399 

400 return empty_pb2.Empty() 

401 

402 def ListUserCommunities(self, request, context, session): 

403 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

404 next_node_id = int(request.page_token) if request.page_token else 0 

405 user_id = request.user_id or context.user_id 

406 nodes = ( 

407 session.execute( 

408 select(Node) 

409 .join(Cluster, Cluster.parent_node_id == Node.id) 

410 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

411 .where(ClusterSubscription.user_id == user_id) 

412 .where(Cluster.is_official_cluster) 

413 .where(Node.id >= next_node_id) 

414 .order_by(Node.id) 

415 .limit(page_size + 1) 

416 ) 

417 .scalars() 

418 .all() 

419 ) 

420 

421 return communities_pb2.ListUserCommunitiesRes( 

422 communities=[community_to_pb(session, node, context) for node in nodes[:page_size]], 

423 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

424 )