Coverage for src/couchers/servicers/communities.py: 84%

194 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers.constants import COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD 

9from couchers.crypto import decrypt_page_token, encrypt_page_token 

10from couchers.db import can_moderate_node, get_node_parents_recursively, is_user_in_node_geography 

11from couchers.materialized_views import ClusterAdminCount, ClusterSubscriptionCount 

12from couchers.models import ( 

13 Cluster, 

14 ClusterRole, 

15 ClusterSubscription, 

16 Discussion, 

17 Event, 

18 EventOccurrence, 

19 Node, 

20 Page, 

21 PageType, 

22 User, 

23) 

24from couchers.proto import communities_pb2, communities_pb2_grpc, groups_pb2 

25from couchers.servicers.discussions import discussion_to_pb 

26from couchers.servicers.events import event_to_pb 

27from couchers.servicers.groups import group_to_pb 

28from couchers.servicers.pages import page_to_pb 

29from couchers.sql import couchers_select as select 

30from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

31 

32logger = logging.getLogger(__name__) 

33 

34MAX_PAGINATION_LENGTH = 25 

35 

36 

37def _parents_to_pb(session, node_id): 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def communities_to_pb(session, nodes: list[Node], context): 

53 can_moderates = [can_moderate_node(session, context.user_id, node.id) for node in nodes] 

54 

55 official_clusters = [node.official_cluster for node in nodes] 

56 official_cluster_ids = [cluster.id for cluster in official_clusters] 

57 

58 member_counts = dict( 

59 session.execute( 

60 select(ClusterSubscriptionCount.cluster_id, ClusterSubscriptionCount.count).where( 

61 ClusterSubscriptionCount.cluster_id.in_(official_cluster_ids) 

62 ) 

63 ).all() 

64 ) 

65 cluster_memberships = set( 

66 session.execute( 

67 select(ClusterSubscription.cluster_id) 

68 .where(ClusterSubscription.user_id == context.user_id) 

69 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

70 ) 

71 .scalars() 

72 .all() 

73 ) 

74 

75 admin_counts = dict( 

76 session.execute( 

77 select(ClusterAdminCount.cluster_id, ClusterAdminCount.count).where( 

78 ClusterAdminCount.cluster_id.in_(official_cluster_ids) 

79 ) 

80 ).all() 

81 ) 

82 cluster_adminships = set( 

83 session.execute( 

84 select(ClusterSubscription.cluster_id) 

85 .where(ClusterSubscription.user_id == context.user_id) 

86 .where(ClusterSubscription.cluster_id.in_(official_cluster_ids)) 

87 .where(ClusterSubscription.role == ClusterRole.admin) 

88 ) 

89 .scalars() 

90 .all() 

91 ) 

92 

93 return [ 

94 communities_pb2.Community( 

95 community_id=node.id, 

96 name=official_cluster.name, 

97 slug=official_cluster.slug, 

98 description=official_cluster.description, 

99 created=Timestamp_from_datetime(node.created), 

100 parents=_parents_to_pb(session, node.id), 

101 member=official_cluster.id in cluster_memberships, 

102 admin=official_cluster.id in cluster_adminships, 

103 member_count=member_counts.get(official_cluster.id, 1), 

104 admin_count=admin_counts.get(official_cluster.id, 1), 

105 main_page=page_to_pb(session, official_cluster.main_page, context), 

106 can_moderate=can_moderate, 

107 discussions_enabled=official_cluster.discussions_enabled, 

108 events_enabled=official_cluster.events_enabled, 

109 ) 

110 for node, official_cluster, can_moderate in zip(nodes, official_clusters, can_moderates) 

111 ] 

112 

113 

114def community_to_pb(session, node: Node, context): 

115 return communities_to_pb(session, [node], context)[0] 

116 

117 

118class Communities(communities_pb2_grpc.CommunitiesServicer): 

119 def GetCommunity(self, request, context, session): 

120 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

121 if not node: 

122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

123 

124 return community_to_pb(session, node, context) 

125 

126 def ListCommunities(self, request, context, session): 

127 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

128 offset = int(decrypt_page_token(request.page_token)) if request.page_token else 0 

129 nodes = ( 

130 session.execute( 

131 select(Node) 

132 .join(Cluster, Cluster.parent_node_id == Node.id) 

133 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

134 .where(Cluster.is_official_cluster) 

135 .order_by(Cluster.name) 

136 .limit(page_size + 1) 

137 .offset(offset) 

138 ) 

139 .scalars() 

140 .all() 

141 ) 

142 return communities_pb2.ListCommunitiesRes( 

143 communities=communities_to_pb(session, nodes[:page_size], context), 

144 next_page_token=encrypt_page_token(str(offset + page_size)) if len(nodes) > page_size else None, 

145 ) 

146 

147 def SearchCommunities(self, request, context, session): 

148 raw_query = request.query.strip() 

149 if len(raw_query) < 3: 

150 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "query_too_short") 

151 

152 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

153 

154 word_similarity_score = func.word_similarity(func.unaccent(raw_query), func.immutable_unaccent(Cluster.name)) 

155 

156 query = ( 

157 select(Node) 

158 .join(Cluster, Cluster.parent_node_id == Node.id) 

159 .where(Cluster.is_official_cluster) 

160 .where(word_similarity_score > COMMUNITIES_SEARCH_FUZZY_SIMILARITY_THRESHOLD) 

161 .order_by(word_similarity_score.desc(), Cluster.name.asc(), Node.id.asc()) 

162 .limit(page_size) 

163 ) 

164 

165 rows = session.execute(query).scalars().all() 

166 

167 return communities_pb2.SearchCommunitiesRes(communities=communities_to_pb(session, rows, context)) 

168 

169 def ListGroups(self, request, context, session): 

170 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

171 next_cluster_id = int(request.page_token) if request.page_token else 0 

172 clusters = ( 

173 session.execute( 

174 select(Cluster) 

175 .where(~Cluster.is_official_cluster) # not an official group 

176 .where(Cluster.parent_node_id == request.community_id) 

177 .where(Cluster.id >= next_cluster_id) 

178 .order_by(Cluster.id) 

179 .limit(page_size + 1) 

180 ) 

181 .scalars() 

182 .all() 

183 ) 

184 return communities_pb2.ListGroupsRes( 

185 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

186 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

187 ) 

188 

189 def ListAdmins(self, request, context, session): 

190 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

191 next_admin_id = int(request.page_token) if request.page_token else 0 

192 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

193 if not node: 

194 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

195 admins = ( 

196 session.execute( 

197 select(User) 

198 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

199 .where_users_visible(context) 

200 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

201 .where(ClusterSubscription.role == ClusterRole.admin) 

202 .where(User.id >= next_admin_id) 

203 .order_by(User.id) 

204 .limit(page_size + 1) 

205 ) 

206 .scalars() 

207 .all() 

208 ) 

209 return communities_pb2.ListAdminsRes( 

210 admin_user_ids=[admin.id for admin in admins[:page_size]], 

211 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

212 ) 

213 

214 def AddAdmin(self, request, context, session): 

215 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

216 if not node: 

217 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

218 if not can_moderate_node(session, context.user_id, node.id): 

219 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

220 

221 user = session.execute( 

222 select(User).where_users_visible(context).where(User.id == request.user_id) 

223 ).scalar_one_or_none() 

224 if not user: 

225 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

226 

227 subscription = session.execute( 

228 select(ClusterSubscription) 

229 .where(ClusterSubscription.user_id == user.id) 

230 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

231 ).scalar_one_or_none() 

232 if not subscription: 

233 # Can't upgrade a member to admin if they're not already a member 

234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

235 if subscription.role == ClusterRole.admin: 

236 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_admin") 

237 

238 subscription.role = ClusterRole.admin 

239 

240 return empty_pb2.Empty() 

241 

242 def RemoveAdmin(self, request, context, session): 

243 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

244 if not node: 

245 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

246 if not can_moderate_node(session, context.user_id, node.id): 

247 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "node_moderate_permission_denied") 

248 

249 user = session.execute( 

250 select(User).where_users_visible(context).where(User.id == request.user_id) 

251 ).scalar_one_or_none() 

252 if not user: 

253 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

254 

255 subscription = session.execute( 

256 select(ClusterSubscription) 

257 .where(ClusterSubscription.user_id == user.id) 

258 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

259 ).scalar_one_or_none() 

260 if not subscription: 

261 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_member") 

262 if subscription.role == ClusterRole.member: 

263 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_admin") 

264 

265 subscription.role = ClusterRole.member 

266 

267 return empty_pb2.Empty() 

268 

269 def ListMembers(self, request, context, session): 

270 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

271 next_member_id = int(request.page_token) if request.page_token else None 

272 

273 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

274 if not node: 

275 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

276 

277 query = ( 

278 select(User) 

279 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

280 .where_users_visible(context) 

281 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

282 ) 

283 if next_member_id is not None: 

284 query = query.where(User.id <= next_member_id) 

285 members = session.execute(query.order_by(User.id.desc()).limit(page_size + 1)).scalars().all() 

286 

287 return communities_pb2.ListMembersRes( 

288 member_user_ids=[member.id for member in members[:page_size]], 

289 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

290 ) 

291 

292 def ListNearbyUsers(self, request, context, session): 

293 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

294 next_nearby_id = int(request.page_token) if request.page_token else 0 

295 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

296 if not node: 

297 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

298 nearbys = ( 

299 session.execute( 

300 select(User) 

301 .where_users_visible(context) 

302 .where(func.ST_Contains(node.geom, User.geom)) 

303 .where(User.id >= next_nearby_id) 

304 .order_by(User.id) 

305 .limit(page_size + 1) 

306 ) 

307 .scalars() 

308 .all() 

309 ) 

310 return communities_pb2.ListNearbyUsersRes( 

311 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

312 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

313 ) 

314 

315 def ListPlaces(self, request, context, session): 

316 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

317 next_page_id = int(request.page_token) if request.page_token else 0 

318 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

319 if not node: 

320 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

321 places = ( 

322 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

323 .where(Page.id >= next_page_id) 

324 .order_by(Page.id) 

325 .limit(page_size + 1) 

326 .all() 

327 ) 

328 return communities_pb2.ListPlacesRes( 

329 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

330 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

331 ) 

332 

333 def ListGuides(self, request, context, session): 

334 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

335 next_page_id = int(request.page_token) if request.page_token else 0 

336 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

337 if not node: 

338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

339 guides = ( 

340 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

341 .where(Page.id >= next_page_id) 

342 .order_by(Page.id) 

343 .limit(page_size + 1) 

344 .all() 

345 ) 

346 return communities_pb2.ListGuidesRes( 

347 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

348 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

349 ) 

350 

351 def ListEvents(self, request, context, session): 

352 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

353 # the page token is a unix timestamp of where we left off 

354 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

355 

356 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

357 if not node: 

358 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

359 if not node.official_cluster.events_enabled: 

360 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

361 

362 if not request.include_parents: 

363 nodes_clusters_to_search = [(node.id, node.official_cluster)] 

364 else: 

365 # the first value is the node_id, the last is the cluster (object) 

366 nodes_clusters_to_search = [ 

367 (parent[0], parent[3]) for parent in get_node_parents_recursively(session, node.id) 

368 ] 

369 

370 membership_clauses = [] 

371 for node_id, official_cluster_obj in nodes_clusters_to_search: 

372 membership_clauses.append(Event.owner_cluster == official_cluster_obj) 

373 membership_clauses.append(Event.parent_node_id == node_id) 

374 

375 # for communities, we list events owned by this community or for which this is a parent 

376 occurrences = ( 

377 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(or_(*membership_clauses)) 

378 ) 

379 

380 if request.past: 

381 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

382 EventOccurrence.start_time.desc() 

383 ) 

384 else: 

385 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

386 EventOccurrence.start_time.asc() 

387 ) 

388 

389 occurrences = occurrences.limit(page_size + 1) 

390 occurrences = session.execute(occurrences).scalars().all() 

391 

392 return communities_pb2.ListEventsRes( 

393 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

394 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

395 ) 

396 

397 def ListDiscussions(self, request, context, session): 

398 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

399 next_page_id = int(request.page_token) if request.page_token else 0 

400 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

401 if not node: 

402 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

403 if not node.official_cluster.discussions_enabled: 

404 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "discussions_not_enabled") 

405 discussions = ( 

406 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

407 .order_by(Discussion.id.desc()) 

408 .limit(page_size + 1) 

409 .all() 

410 ) 

411 return communities_pb2.ListDiscussionsRes( 

412 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

413 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

414 ) 

415 

416 def JoinCommunity(self, request, context, session): 

417 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

418 if not node: 

419 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

420 

421 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

422 if current_membership: 

423 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_community") 

424 

425 node.official_cluster.cluster_subscriptions.append( 

426 ClusterSubscription( 

427 user_id=context.user_id, 

428 role=ClusterRole.member, 

429 ) 

430 ) 

431 

432 return empty_pb2.Empty() 

433 

434 def LeaveCommunity(self, request, context, session): 

435 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

436 if not node: 

437 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

438 

439 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

440 

441 if not current_membership: 

442 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_community") 

443 

444 if is_user_in_node_geography(session, context.user_id, node.id): 

445 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "cannot_leave_containing_community") 

446 

447 session.execute( 

448 delete(ClusterSubscription) 

449 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

450 .where(ClusterSubscription.user_id == context.user_id) 

451 ) 

452 

453 return empty_pb2.Empty() 

454 

455 def ListUserCommunities(self, request, context, session): 

456 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

457 next_node_id = int(request.page_token) if request.page_token else 0 

458 user_id = request.user_id or context.user_id 

459 nodes = ( 

460 session.execute( 

461 select(Node) 

462 .join(Cluster, Cluster.parent_node_id == Node.id) 

463 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

464 .where(ClusterSubscription.user_id == user_id) 

465 .where(Cluster.is_official_cluster) 

466 .where(Node.id >= next_node_id) 

467 .order_by(Node.id) 

468 .limit(page_size + 1) 

469 ) 

470 .scalars() 

471 .all() 

472 ) 

473 

474 return communities_pb2.ListUserCommunitiesRes( 

475 communities=communities_to_pb(session, nodes[:page_size], context), 

476 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

477 ) 

478 

479 def ListAllCommunities(self, request, context, session): 

480 """List all communities ordered hierarchically by parent-child relationships""" 

481 # Get all nodes with their clusters, member counts, and user membership in a single query 

482 results = session.execute( 

483 select( 

484 Node, 

485 Cluster, 

486 ClusterSubscriptionCount.count, 

487 ClusterSubscription.cluster_id.label("user_subscription"), 

488 ) 

489 .join(Cluster, Cluster.parent_node_id == Node.id) 

490 .outerjoin( 

491 ClusterSubscriptionCount, 

492 ClusterSubscriptionCount.cluster_id == Cluster.id, 

493 ) 

494 .outerjoin( 

495 ClusterSubscription, 

496 (ClusterSubscription.cluster_id == Cluster.id) & (ClusterSubscription.user_id == context.user_id), 

497 ) 

498 .where(Cluster.is_official_cluster) 

499 .order_by(Node.id) 

500 ).all() 

501 

502 return communities_pb2.ListAllCommunitiesRes( 

503 communities=[ 

504 communities_pb2.CommunitySummary( 

505 community_id=node.id, 

506 name=cluster.name, 

507 slug=cluster.slug, 

508 member=user_subscription is not None, 

509 member_count=member_count or 1, 

510 parents=_parents_to_pb(session, node.id), 

511 created=Timestamp_from_datetime(node.created), 

512 ) 

513 for node, cluster, member_count, user_subscription in results 

514 ], 

515 )