Coverage for src/couchers/servicers/communities.py: 83%

192 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 20:25 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD 

10from couchers.crypto import decrypt_page_token, encrypt_page_token 

11from couchers.db import can_moderate_node, get_node_parents_recursively 

12from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

13from couchers.models import ( 

14 Cluster, 

15 ClusterRole, 

16 ClusterSubscription, 

17 Discussion, 

18 Event, 

19 EventOccurrence, 

20 Node, 

21 Page, 

22 PageType, 

23 User, 

24) 

25from couchers.servicers.discussions import discussion_to_pb 

26from couchers.servicers.events import event_to_pb 

27from couchers.servicers.groups import group_to_pb 

28from couchers.servicers.pages import page_to_pb 

29from couchers.sql import couchers_select as select 

30from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

31from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

32 

33logger = logging.getLogger(__name__) 

34 

35MAX_PAGINATION_LENGTH = 25 

36 

37 

38def _parents_to_pb(session, node_id): 

39 parents = get_node_parents_recursively(session, node_id) 

40 return [ 

41 groups_pb2.Parent( 

42 community=groups_pb2.CommunityParent( 

43 community_id=node_id, 

44 name=cluster.name, 

45 slug=cluster.slug, 

46 description=cluster.description, 

47 ) 

48 ) 

49 for node_id, parent_node_id, level, cluster in parents 

50 ] 

51 

52 

53def communities_to_pb(session, nodes: list[Node], context): 

54 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

55 

56 official_clusters = [node.official_cluster for node in nodes] 

57 official_cluster_ids = [cluster.id for cluster in official_clusters] 

58 

59 member_counts = dict( 

60 session.execute( 

61 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

62 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

63 ) 

64 ).all() 

65 ) 

66 cluster_memberships = set( 

67 session.execute( 

68 select(ClusterSubscription.cluster_id) 

69 .where(ClusterSubscription.user_id == context.user_id) 

70 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

71 ) 

72 .scalars() 

73 .all() 

74 ) 

75 

76 admin_counts = dict( 

77 session.execute( 

78 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

79 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

80 ) 

81 ).all() 

82 ) 

83 cluster_adminships = set( 

84 session.execute( 

85 select(ClusterSubscription.cluster_id) 

86 .where(ClusterSubscription.user_id == context.user_id) 

87 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

88 .where(ClusterSubscription.role == ClusterRole.admin) 

89 ) 

90 .scalars() 

91 .all() 

92 ) 

93 

94 return [ 

95 communities_pb2.Community( 

96 community_id=node.id, 

97 name=official_cluster.name, 

98 slug=official_cluster.slug, 

99 description=official_cluster.description, 

100 created=Timestamp_from_datetime(node.created), 

101 parents=_parents_to_pb(session, node.id), 

102 member=official_cluster.id in cluster_memberships, 

103 admin=official_cluster.id in cluster_adminships, 

104 member_count=member_counts.get(official_cluster.id, 1), 

105 admin_count=admin_counts.get(official_cluster.id, 1), 

106 main_page=page_to_pb(session, official_cluster.main_page, context), 

107 can_moderate=can_moderate, 

108 discussions_enabled=official_cluster.discussions_enabled, 

109 events_enabled=official_cluster.events_enabled, 

110 ) 

111 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

112 ] 

113 

114 

115def community_to_pb(session, node: Node, context): 

116 return communities_to_pb(session, [node], context)[0] 

117 

118 

119class Communities(communities_pb2_grpc.CommunitiesServicer): 

120 def GetCommunity(self, request, context, session): 

121 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

122 if not node: 

123 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

124 

125 return community_to_pb(session, node, context) 

126 

127 def ListCommunities(self, request, context, session): 

128 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

129 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

130 nodes = ( 

131 session.execute( 

132 select(Node) 

133 .join(Cluster, Cluster.parent_node_id == Node.id) 

134 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

135 .where(Cluster.is_official_cluster) 

136 .order_by(Cluster.name) 

137 .limit(page_size + 1) 

138 .offset(offset) 

139 ) 

140 .scalars() 

141 .all() 

142 ) 

143 return communities_pb2.ListCommunitiesRes( 

144 communities=communities_to_pb(session, nodes[:page_size], context), 

145 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

146 ) 

147 

148 def SearchCommunities(self, request, context, session): 

149 raw_query = request.query.strip() 

150 if len(raw_query) < 3: 

151 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.QUERY_TOO_SHORT) 

152 

153 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

154 

155 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name)) 

156 

157 query = ( 

158 select(Node) 

159 .join(Cluster, Cluster.parent_node_id == Node.id) 

160 .where(Cluster.is_official_cluster) 

161 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD) 

162 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc()) 

163 .limit(page_size) 

164 ) 

165 

166 rows = session.execute(query).scalars().all() 

167 

168 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context)) 

169 

170 def ListGroups(self, request, context, session): 

171 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

172 next_cluster_id = int(request.page_token) if request.page_token else 0 

173 clusters = ( 

174 session.execute( 

175 select(Cluster) 

176 .where(~Cluster.is_official_cluster) # not an official group 

177 .where(Cluster.parent_node_id == request.community_id) 

178 .where(Cluster.id >= next_cluster_id) 

179 .order_by(Cluster.id) 

180 .limit(page_size + 1) 

181 ) 

182 .scalars() 

183 .all() 

184 ) 

185 return communities_pb2.ListGroupsRes( 

186 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

187 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

188 ) 

189 

190 def ListAdmins(self, request, context, session): 

191 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

192 next_admin_id = int(request.page_token) if request.page_token else 0 

193 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

194 if not node: 

195 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

196 admins = ( 

197 session.execute( 

198 select(User) 

199 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

200 .where_users_visible(context) 

201 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

202 .where(ClusterSubscription.role == ClusterRole.admin) 

203 .where(User.id >= next_admin_id) 

204 .order_by(User.id) 

205 .limit(page_size + 1) 

206 ) 

207 .scalars() 

208 .all() 

209 ) 

210 return communities_pb2.ListAdminsRes( 

211 admin_user_ids=[admin.id for admin in admins[:page_size]], 

212 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

213 ) 

214 

215 def AddAdmin(self, request, context, session): 

216 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

217 if not node: 

218 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

219 if not can_moderate_node(session, context.user_id, node.id): 

220 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

221 

222 user = session.execute( 

223 select(User).where_users_visible(context).where(User.id == request.user_id) 

224 ).scalar_one_or_none() 

225 if not user: 

226 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

227 

228 subscription = session.execute( 

229 select(ClusterSubscription) 

230 .where(ClusterSubscription.user_id == user.id) 

231 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

232 ).scalar_one_or_none() 

233 if not subscription: 

234 # Can't upgrade a member to admin if they're not already a member 

235 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

236 if subscription.role == ClusterRole.admin: 

237 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_ADMIN) 

238 

239 subscription.role = ClusterRole.admin 

240 

241 return empty_pb2.Empty() 

242 

243 def RemoveAdmin(self, request, context, session): 

244 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

245 if not node: 

246 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

247 if not can_moderate_node(session, context.user_id, node.id): 

248 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NODE_MODERATE_PERMISSION_DENIED) 

249 

250 user = session.execute( 

251 select(User).where_users_visible(context).where(User.id == request.user_id) 

252 ).scalar_one_or_none() 

253 if not user: 

254 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

255 

256 subscription = session.execute( 

257 select(ClusterSubscription) 

258 .where(ClusterSubscription.user_id == user.id) 

259 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

260 ).scalar_one_or_none() 

261 if not subscription: 

262 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_MEMBER) 

263 if subscription.role == ClusterRole.member: 

264 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_ADMIN) 

265 

266 subscription.role = ClusterRole.member 

267 

268 return empty_pb2.Empty() 

269 

270 def ListMembers(self, request, context, session): 

271 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

272 next_member_id = int(request.page_token) if request.page_token else None 

273 

274 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

275 if not node: 

276 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

277 

278 query = ( 

279 select(User) 

280 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

281 .where_users_visible(context) 

282 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

283 ) 

284 if next_member_id is not None: 

285 query = query.where(User.id <= next_member_id) 

286 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

287 

288 return communities_pb2.ListMembersRes( 

289 member_user_ids=[member.id for member in members[:page_size]], 

290 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

291 ) 

292 

293 def ListNearbyUsers(self, request, context, session): 

294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

295 next_nearby_id = int(request.page_token) if request.page_token else 0 

296 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

297 if not node: 

298 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

299 nearbys = ( 

300 session.execute( 

301 select(User) 

302 .where_users_visible(context) 

303 .where(func.ST_Contains(node.geom, User.geom)) 

304 .where(User.id >= next_nearby_id) 

305 .order_by(User.id) 

306 .limit(page_size + 1) 

307 ) 

308 .scalars() 

309 .all() 

310 ) 

311 return communities_pb2.ListNearbyUsersRes( 

312 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

313 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

314 ) 

315 

316 def ListPlaces(self, request, context, session): 

317 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

318 next_page_id = int(request.page_token) if request.page_token else 0 

319 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

320 if not node: 

321 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

322 places = ( 

323 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

324 .where(Page.id >= next_page_id) 

325 .order_by(Page.id) 

326 .limit(page_size + 1) 

327 .all() 

328 ) 

329 return communities_pb2.ListPlacesRes( 

330 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

331 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

332 ) 

333 

334 def ListGuides(self, request, context, session): 

335 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

336 next_page_id = int(request.page_token) if request.page_token else 0 

337 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

338 if not node: 

339 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

340 guides = ( 

341 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

342 .where(Page.id >= next_page_id) 

343 .order_by(Page.id) 

344 .limit(page_size + 1) 

345 .all() 

346 ) 

347 return communities_pb2.ListGuidesRes( 

348 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

349 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

350 ) 

351 

352 def ListEvents(self, request, context, session): 

353 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

354 # the page token is a unix timestamp of where we left off 

355 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

356 

357 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

358 if not node: 

359 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

360 if not node.official_cluster.events_enabled: 

361 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED) 

362 

363 if not request.include_parents: 

364 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

365 else: 

366 # the first value is the node_id, the last is the cluster (object) 

367 nodes_clusters_to_search = [ 

368 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

369 ] 

370 

371 membership_clauses = [] 

372 for node_id, official_cluster_obj in nodes_clusters_to_search: 

373 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

374 membership_clauses.append(Event.parent_node_id == node_id) 

375 

376 # for communities, we list events owned by this community or for which this is a parent 

377 occurrences = ( 

378 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

379 ) 

380 

381 if request.past: 

382 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

383 EventOccurrence.start_time.desc() 

384 ) 

385 else: 

386 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

387 EventOccurrence.start_time.asc() 

388 ) 

389 

390 occurrences = occurrences.limit(page_size + 1) 

391 occurrences = session.execute(occurrences).scalars().all() 

392 

393 return communities_pb2.ListEventsRes( 

394 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

395 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

396 ) 

397 

398 def ListDiscussions(self, request, context, session): 

399 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

400 next_page_id = int(request.page_token) if request.page_token else 0 

401 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

402 if not node: 

403 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

404 if not node.official_cluster.discussions_enabled: 

405 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.DISCUSSIONS_NOT_ENABLED) 

406 discussions = ( 

407 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

408 .order_by(Discussion.id.desc()) 

409 .limit(page_size + 1) 

410 .all() 

411 ) 

412 return communities_pb2.ListDiscussionsRes( 

413 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

414 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

415 ) 

416 

417 def JoinCommunity(self, request, context, session): 

418 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

419 if not node: 

420 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

421 

422 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

423 if current_membership: 

424 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

425 

426 node.official_cluster.cluster_subscriptions.append( 

427 ClusterSubscription( 

428 user_id=context.user_id, 

429 role=ClusterRole.member, 

430 ) 

431 ) 

432 

433 return empty_pb2.Empty() 

434 

435 def LeaveCommunity(self, request, context, session): 

436 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

437 if not node: 

438 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

439 

440 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

441 

442 if not current_membership: 

443 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

444 

445 if context.user_id in node.contained_user_ids: 

446 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

447 

448 session.execute( 

449 delete(ClusterSubscription) 

450 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

451 .where(ClusterSubscription.user_id == context.user_id) 

452 ) 

453 

454 return empty_pb2.Empty() 

455 

456 def ListUserCommunities(self, request, context, session): 

457 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

458 next_node_id = int(request.page_token) if request.page_token else 0 

459 user_id = request.user_id or context.user_id 

460 nodes = ( 

461 session.execute( 

462 select(Node) 

463 .join(Cluster, Cluster.parent_node_id == Node.id) 

464 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

465 .where(ClusterSubscription.user_id == user_id) 

466 .where(Cluster.is_official_cluster) 

467 .where(Node.id >= next_node_id) 

468 .order_by(Node.id) 

469 .limit(page_size + 1) 

470 ) 

471 .scalars() 

472 .all() 

473 ) 

474 

475 return communities_pb2.ListUserCommunitiesRes( 

476 communities=communities_to_pb(session, nodes[:page_size], context), 

477 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

478 )