Coverage for src/couchers/servicers/communities.py: 83%

180 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.db import can_moderate_node, get_node_parents_recursively, session_scope 

10from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts 

11from couchers.models import ( 

12 Cluster, 

13 ClusterRole, 

14 ClusterSubscription, 

15 Discussion, 

16 Event, 

17 EventOccurrence, 

18 Node, 

19 Page, 

20 PageType, 

21 User, 

22) 

23from couchers.servicers.discussions import discussion_to_pb 

24from couchers.servicers.events import event_to_pb 

25from couchers.servicers.groups import group_to_pb 

26from couchers.servicers.pages import page_to_pb 

27from couchers.sql import couchers_select as select 

28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

29from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

30 

31logger = logging.getLogger(__name__) 

32 

33MAX_PAGINATION_LENGTH = 25 

34 

35 

36def _parents_to_pb(node_id): 

37 with session_scope() as session: 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def community_to_pb(node: Node, context): 

53 with session_scope() as session: 

54 can_moderate = can_moderate_node(session, context.user_id, node.id) 

55 

56 member_count = ( 

57 session.execute( 

58 select(cluster_subscription_counts.c.count).where( 

59 cluster_subscription_counts.c.cluster_id == node.official_cluster.id 

60 ) 

61 ).scalar_one_or_none() 

62 or 1 

63 ) 

64 is_member = ( 

65 session.execute( 

66 select(ClusterSubscription) 

67 .where(ClusterSubscription.user_id == context.user_id) 

68 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

69 ).scalar_one_or_none() 

70 is not None 

71 ) 

72 

73 admin_count = ( 

74 session.execute( 

75 select(cluster_admin_counts.c.count).where( 

76 cluster_admin_counts.c.cluster_id == node.official_cluster.id 

77 ) 

78 ).scalar_one_or_none() 

79 or 1 

80 ) 

81 is_admin = ( 

82 session.execute( 

83 select(ClusterSubscription) 

84 .where(ClusterSubscription.user_id == context.user_id) 

85 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

86 .where(ClusterSubscription.role == ClusterRole.admin) 

87 ).scalar_one_or_none() 

88 is not None 

89 ) 

90 

91 return communities_pb2.Community( 

92 community_id=node.id, 

93 name=node.official_cluster.name, 

94 slug=node.official_cluster.slug, 

95 description=node.official_cluster.description, 

96 created=Timestamp_from_datetime(node.created), 

97 parents=_parents_to_pb(node.id), 

98 member=is_member, 

99 admin=is_admin, 

100 member_count=member_count, 

101 admin_count=admin_count, 

102 main_page=page_to_pb(node.official_cluster.main_page, context), 

103 can_moderate=can_moderate, 

104 ) 

105 

106 

107class Communities(communities_pb2_grpc.CommunitiesServicer): 

108 def GetCommunity(self, request, context): 

109 with session_scope() as session: 

110 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

111 if not node: 

112 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

113 

114 return community_to_pb(node, context) 

115 

116 def ListCommunities(self, request, context): 

117 with session_scope() as session: 

118 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

119 next_node_id = int(request.page_token) if request.page_token else 0 

120 nodes = ( 

121 session.execute( 

122 select(Node) 

123 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

124 .where(Node.id >= next_node_id) 

125 .order_by(Node.id) 

126 .limit(page_size + 1) 

127 ) 

128 .scalars() 

129 .all() 

130 ) 

131 return communities_pb2.ListCommunitiesRes( 

132 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

133 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

134 ) 

135 

136 def ListGroups(self, request, context): 

137 with session_scope() as session: 

138 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

139 next_cluster_id = int(request.page_token) if request.page_token else 0 

140 clusters = ( 

141 session.execute( 

142 select(Cluster) 

143 .where(~Cluster.is_official_cluster) # not an official group 

144 .where(Cluster.parent_node_id == request.community_id) 

145 .where(Cluster.id >= next_cluster_id) 

146 .order_by(Cluster.id) 

147 .limit(page_size + 1) 

148 ) 

149 .scalars() 

150 .all() 

151 ) 

152 return communities_pb2.ListGroupsRes( 

153 groups=[group_to_pb(cluster, context) for cluster in clusters[:page_size]], 

154 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

155 ) 

156 

157 def ListAdmins(self, request, context): 

158 with session_scope() as session: 

159 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

160 next_admin_id = int(request.page_token) if request.page_token else 0 

161 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

162 if not node: 

163 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

164 admins = ( 

165 session.execute( 

166 select(User) 

167 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

168 .where_users_visible(context) 

169 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

170 .where(ClusterSubscription.role == ClusterRole.admin) 

171 .where(User.id >= next_admin_id) 

172 .order_by(User.id) 

173 .limit(page_size + 1) 

174 ) 

175 .scalars() 

176 .all() 

177 ) 

178 return communities_pb2.ListAdminsRes( 

179 admin_user_ids=[admin.id for admin in admins[:page_size]], 

180 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

181 ) 

182 

183 def AddAdmin(self, request, context): 

184 with session_scope() as session: 

185 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

186 if not node: 

187 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

188 if not can_moderate_node(session, context.user_id, node.id): 

189 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

190 

191 user = session.execute( 

192 select(User).where_users_visible(context).where(User.id == request.user_id) 

193 ).scalar_one_or_none() 

194 if not user: 

195 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

196 

197 subscription = session.execute( 

198 select(ClusterSubscription) 

199 .where(ClusterSubscription.user_id == user.id) 

200 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

201 ).scalar_one_or_none() 

202 if not subscription: 

203 # Can't upgrade a member to admin if they're not already a member 

204 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

205 if subscription.role == ClusterRole.admin: 

206 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

207 

208 subscription.role = ClusterRole.admin 

209 

210 return empty_pb2.Empty() 

211 

212 def RemoveAdmin(self, request, context): 

213 with session_scope() as session: 

214 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

215 if not node: 

216 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

217 if not can_moderate_node(session, context.user_id, node.id): 

218 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

219 

220 user = session.execute( 

221 select(User).where_users_visible(context).where(User.id == request.user_id) 

222 ).scalar_one_or_none() 

223 if not user: 

224 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

225 

226 subscription = session.execute( 

227 select(ClusterSubscription) 

228 .where(ClusterSubscription.user_id == user.id) 

229 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

230 ).scalar_one_or_none() 

231 if not subscription: 

232 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

233 if subscription.role == ClusterRole.member: 

234 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

235 

236 subscription.role = ClusterRole.member 

237 

238 return empty_pb2.Empty() 

239 

240 def ListMembers(self, request, context): 

241 with session_scope() as session: 

242 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

243 next_member_id = int(request.page_token) if request.page_token else 0 

244 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

245 if not node: 

246 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

247 members = ( 

248 session.execute( 

249 select(User) 

250 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

251 .where_users_visible(context) 

252 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

253 .where(User.id >= next_member_id) 

254 .order_by(User.id) 

255 .limit(page_size + 1) 

256 ) 

257 .scalars() 

258 .all() 

259 ) 

260 return communities_pb2.ListMembersRes( 

261 member_user_ids=[member.id for member in members[:page_size]], 

262 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

263 ) 

264 

265 def ListNearbyUsers(self, request, context): 

266 with session_scope() as session: 

267 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

268 next_nearby_id = int(request.page_token) if request.page_token else 0 

269 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

270 if not node: 

271 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

272 nearbys = ( 

273 session.execute( 

274 select(User) 

275 .where_users_visible(context) 

276 .where(func.ST_Contains(node.geom, User.geom)) 

277 .where(User.id >= next_nearby_id) 

278 .order_by(User.id) 

279 .limit(page_size + 1) 

280 ) 

281 .scalars() 

282 .all() 

283 ) 

284 return communities_pb2.ListNearbyUsersRes( 

285 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

286 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

287 ) 

288 

289 def ListPlaces(self, request, context): 

290 with session_scope() as session: 

291 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

292 next_page_id = int(request.page_token) if request.page_token else 0 

293 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

294 if not node: 

295 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

296 places = ( 

297 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

298 .where(Page.id >= next_page_id) 

299 .order_by(Page.id) 

300 .limit(page_size + 1) 

301 .all() 

302 ) 

303 return communities_pb2.ListPlacesRes( 

304 places=[page_to_pb(page, context) for page in places[:page_size]], 

305 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

306 ) 

307 

308 def ListGuides(self, request, context): 

309 with session_scope() as session: 

310 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

311 next_page_id = int(request.page_token) if request.page_token else 0 

312 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

313 if not node: 

314 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

315 guides = ( 

316 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

317 .where(Page.id >= next_page_id) 

318 .order_by(Page.id) 

319 .limit(page_size + 1) 

320 .all() 

321 ) 

322 return communities_pb2.ListGuidesRes( 

323 guides=[page_to_pb(page, context) for page in guides[:page_size]], 

324 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

325 ) 

326 

327 def ListEvents(self, request, context): 

328 with session_scope() as session: 

329 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

330 # the page token is a unix timestamp of where we left off 

331 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

332 

333 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

334 if not node: 

335 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

336 

337 # for communities, we list events owned by this community or for which this is a parent 

338 occurrences = ( 

339 select(EventOccurrence) 

340 .join(Event, Event.id == EventOccurrence.event_id) 

341 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

342 ) 

343 

344 if request.past: 

345 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

346 EventOccurrence.start_time.desc() 

347 ) 

348 else: 

349 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

350 EventOccurrence.start_time.asc() 

351 ) 

352 

353 occurrences = occurrences.limit(page_size + 1) 

354 occurrences = session.execute(occurrences).scalars().all() 

355 

356 return communities_pb2.ListEventsRes( 

357 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

358 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

359 ) 

360 

361 def ListDiscussions(self, request, context): 

362 with session_scope() as session: 

363 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

364 next_page_id = int(request.page_token) if request.page_token else 0 

365 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

366 if not node: 

367 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

368 discussions = ( 

369 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

370 .order_by(Discussion.id.desc()) 

371 .limit(page_size + 1) 

372 .all() 

373 ) 

374 return communities_pb2.ListDiscussionsRes( 

375 discussions=[discussion_to_pb(discussion, context) for discussion in discussions[:page_size]], 

376 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

377 ) 

378 

379 def JoinCommunity(self, request, context): 

380 with session_scope() as session: 

381 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

382 if not node: 

383 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

384 

385 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

386 if current_membership: 

387 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

388 

389 node.official_cluster.cluster_subscriptions.append( 

390 ClusterSubscription( 

391 user_id=context.user_id, 

392 role=ClusterRole.member, 

393 ) 

394 ) 

395 

396 return empty_pb2.Empty() 

397 

398 def LeaveCommunity(self, request, context): 

399 with session_scope() as session: 

400 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

401 if not node: 

402 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

403 

404 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

405 

406 if not current_membership: 

407 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

408 

409 if context.user_id in node.contained_user_ids: 

410 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

411 

412 session.execute( 

413 delete(ClusterSubscription) 

414 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

415 .where(ClusterSubscription.user_id == context.user_id) 

416 ) 

417 

418 return empty_pb2.Empty() 

419 

420 def ListUserCommunities(self, request, context): 

421 with session_scope() as session: 

422 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

423 next_node_id = int(request.page_token) if request.page_token else 0 

424 user_id = request.user_id or context.user_id 

425 nodes = ( 

426 session.execute( 

427 select(Node) 

428 .join(Cluster, Cluster.parent_node_id == Node.id) 

429 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

430 .where(ClusterSubscription.user_id == user_id) 

431 .where(Cluster.is_official_cluster) 

432 .where(Node.id >= next_node_id) 

433 .order_by(Node.id) 

434 .limit(page_size + 1) 

435 ) 

436 .scalars() 

437 .all() 

438 ) 

439 

440 return communities_pb2.ListUserCommunitiesRes( 

441 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

442 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

443 )