Coverage for src/couchers/servicers/communities.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

146 statements  

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.db import can_moderate_node, get_node_parents_recursively, session_scope 

10from couchers.materialized_views import cluster_admin_counts, cluster_subscription_counts 

11from couchers.models import ( 

12 Cluster, 

13 ClusterRole, 

14 ClusterSubscription, 

15 Discussion, 

16 Event, 

17 EventOccurrence, 

18 Node, 

19 Page, 

20 PageType, 

21 User, 

22) 

23from couchers.servicers.discussions import discussion_to_pb 

24from couchers.servicers.events import event_to_pb 

25from couchers.servicers.groups import group_to_pb 

26from couchers.servicers.pages import page_to_pb 

27from couchers.sql import couchers_select as select 

28from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

29from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

30 

31logger = logging.getLogger(__name__) 

32 

33MAX_PAGINATION_LENGTH = 25 

34 

35 

36def _parents_to_pb(node_id): 

37 with session_scope() as session: 

38 parents = get_node_parents_recursively(session, node_id) 

39 return [ 

40 groups_pb2.Parent( 

41 community=groups_pb2.CommunityParent( 

42 community_id=node_id, 

43 name=cluster.name, 

44 slug=cluster.slug, 

45 description=cluster.description, 

46 ) 

47 ) 

48 for node_id, parent_node_id, level, cluster in parents 

49 ] 

50 

51 

52def community_to_pb(node: Node, context): 

53 with session_scope() as session: 

54 can_moderate = can_moderate_node(session, context.user_id, node.id) 

55 

56 member_count = ( 

57 session.execute( 

58 select(cluster_subscription_counts.c.count).where( 

59 cluster_subscription_counts.c.cluster_id == node.official_cluster.id 

60 ) 

61 ).scalar_one_or_none() 

62 or 1 

63 ) 

64 is_member = ( 

65 session.execute( 

66 select(ClusterSubscription) 

67 .where(ClusterSubscription.user_id == context.user_id) 

68 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

69 ).scalar_one_or_none() 

70 is not None 

71 ) 

72 

73 admin_count = ( 

74 session.execute( 

75 select(cluster_admin_counts.c.count).where( 

76 cluster_admin_counts.c.cluster_id == node.official_cluster.id 

77 ) 

78 ).scalar_one_or_none() 

79 or 1 

80 ) 

81 is_admin = ( 

82 session.execute( 

83 select(ClusterSubscription) 

84 .where(ClusterSubscription.user_id == context.user_id) 

85 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

86 .where(ClusterSubscription.role == ClusterRole.admin) 

87 ).scalar_one_or_none() 

88 is not None 

89 ) 

90 

91 return communities_pb2.Community( 

92 community_id=node.id, 

93 name=node.official_cluster.name, 

94 slug=node.official_cluster.slug, 

95 description=node.official_cluster.description, 

96 created=Timestamp_from_datetime(node.created), 

97 parents=_parents_to_pb(node.id), 

98 member=is_member, 

99 admin=is_admin, 

100 member_count=member_count, 

101 admin_count=admin_count, 

102 main_page=page_to_pb(node.official_cluster.main_page, context), 

103 can_moderate=can_moderate, 

104 ) 

105 

106 

107class Communities(communities_pb2_grpc.CommunitiesServicer): 

108 def GetCommunity(self, request, context): 

109 with session_scope() as session: 

110 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

111 if not node: 

112 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

113 

114 return community_to_pb(node, context) 

115 

116 def ListCommunities(self, request, context): 

117 with session_scope() as session: 

118 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

119 next_node_id = int(request.page_token) if request.page_token else 0 

120 nodes = ( 

121 session.execute( 

122 select(Node) 

123 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

124 .where(Node.id >= next_node_id) 

125 .order_by(Node.id) 

126 .limit(page_size + 1) 

127 ) 

128 .scalars() 

129 .all() 

130 ) 

131 return communities_pb2.ListCommunitiesRes( 

132 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

133 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

134 ) 

135 

136 def ListGroups(self, request, context): 

137 with session_scope() as session: 

138 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

139 next_cluster_id = int(request.page_token) if request.page_token else 0 

140 clusters = ( 

141 session.execute( 

142 select(Cluster) 

143 .where(~Cluster.is_official_cluster) # not an official group 

144 .where(Cluster.parent_node_id == request.community_id) 

145 .where(Cluster.id >= next_cluster_id) 

146 .order_by(Cluster.id) 

147 .limit(page_size + 1) 

148 ) 

149 .scalars() 

150 .all() 

151 ) 

152 return communities_pb2.ListGroupsRes( 

153 groups=[group_to_pb(cluster, context) for cluster in clusters[:page_size]], 

154 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

155 ) 

156 

157 def ListAdmins(self, request, context): 

158 with session_scope() as session: 

159 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

160 next_admin_id = int(request.page_token) if request.page_token else 0 

161 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

162 if not node: 

163 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

164 admins = ( 

165 session.execute( 

166 select(User) 

167 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

168 .where_users_visible(context) 

169 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

170 .where(ClusterSubscription.role == ClusterRole.admin) 

171 .where(User.id >= next_admin_id) 

172 .order_by(User.id) 

173 .limit(page_size + 1) 

174 ) 

175 .scalars() 

176 .all() 

177 ) 

178 return communities_pb2.ListAdminsRes( 

179 admin_user_ids=[admin.id for admin in admins[:page_size]], 

180 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

181 ) 

182 

183 def ListMembers(self, request, context): 

184 with session_scope() as session: 

185 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

186 next_member_id = int(request.page_token) if request.page_token else 0 

187 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

188 if not node: 

189 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

190 members = ( 

191 session.execute( 

192 select(User) 

193 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

194 .where_users_visible(context) 

195 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

196 .where(User.id >= next_member_id) 

197 .order_by(User.id) 

198 .limit(page_size + 1) 

199 ) 

200 .scalars() 

201 .all() 

202 ) 

203 return communities_pb2.ListMembersRes( 

204 member_user_ids=[member.id for member in members[:page_size]], 

205 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

206 ) 

207 

208 def ListNearbyUsers(self, request, context): 

209 with session_scope() as session: 

210 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

211 next_nearby_id = int(request.page_token) if request.page_token else 0 

212 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

213 if not node: 

214 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

215 nearbys = ( 

216 session.execute( 

217 select(User) 

218 .where_users_visible(context) 

219 .where(func.ST_Contains(node.geom, User.geom)) 

220 .where(User.id >= next_nearby_id) 

221 .order_by(User.id) 

222 .limit(page_size + 1) 

223 ) 

224 .scalars() 

225 .all() 

226 ) 

227 return communities_pb2.ListNearbyUsersRes( 

228 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

229 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

230 ) 

231 

232 def ListPlaces(self, request, context): 

233 with session_scope() as session: 

234 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

235 next_page_id = int(request.page_token) if request.page_token else 0 

236 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

237 if not node: 

238 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

239 places = ( 

240 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

241 .where(Page.id >= next_page_id) 

242 .order_by(Page.id) 

243 .limit(page_size + 1) 

244 .all() 

245 ) 

246 return communities_pb2.ListPlacesRes( 

247 places=[page_to_pb(page, context) for page in places[:page_size]], 

248 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

249 ) 

250 

251 def ListGuides(self, request, context): 

252 with session_scope() as session: 

253 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

254 next_page_id = int(request.page_token) if request.page_token else 0 

255 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

256 if not node: 

257 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

258 guides = ( 

259 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

260 .where(Page.id >= next_page_id) 

261 .order_by(Page.id) 

262 .limit(page_size + 1) 

263 .all() 

264 ) 

265 return communities_pb2.ListGuidesRes( 

266 guides=[page_to_pb(page, context) for page in guides[:page_size]], 

267 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

268 ) 

269 

270 def ListEvents(self, request, context): 

271 with session_scope() as session: 

272 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

273 # the page token is a unix timestamp of where we left off 

274 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

275 

276 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

277 if not node: 

278 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

279 

280 # for communities, we list events owned by this community or for which this is a parent 

281 occurrences = ( 

282 select(EventOccurrence) 

283 .join(Event, Event.id == EventOccurrence.event_id) 

284 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

285 ) 

286 

287 if request.past: 

288 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

289 EventOccurrence.start_time.desc() 

290 ) 

291 else: 

292 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

293 EventOccurrence.start_time.asc() 

294 ) 

295 

296 occurrences = occurrences.limit(page_size + 1) 

297 occurrences = session.execute(occurrences).scalars().all() 

298 

299 return communities_pb2.ListEventsRes( 

300 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

301 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

302 ) 

303 

304 def ListDiscussions(self, request, context): 

305 with session_scope() as session: 

306 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

307 next_page_id = int(request.page_token) if request.page_token else 0 

308 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

309 if not node: 

310 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

311 discussions = ( 

312 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

313 .order_by(Discussion.id.desc()) 

314 .limit(page_size + 1) 

315 .all() 

316 ) 

317 return communities_pb2.ListDiscussionsRes( 

318 discussions=[discussion_to_pb(discussion, context) for discussion in discussions[:page_size]], 

319 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

320 ) 

321 

322 def JoinCommunity(self, request, context): 

323 with session_scope() as session: 

324 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

325 if not node: 

326 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

327 

328 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

329 if current_membership: 

330 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

331 

332 node.official_cluster.cluster_subscriptions.append( 

333 ClusterSubscription( 

334 user_id=context.user_id, 

335 role=ClusterRole.member, 

336 ) 

337 ) 

338 

339 return empty_pb2.Empty() 

340 

341 def LeaveCommunity(self, request, context): 

342 with session_scope() as session: 

343 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

344 if not node: 

345 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

346 

347 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

348 

349 if not current_membership: 

350 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

351 

352 if context.user_id in node.contained_user_ids: 

353 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

354 

355 session.execute( 

356 delete(ClusterSubscription) 

357 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

358 .where(ClusterSubscription.user_id == context.user_id) 

359 ) 

360 

361 return empty_pb2.Empty() 

362 

363 def ListUserCommunities(self, request, context): 

364 with session_scope() as session: 

365 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

366 next_node_id = int(request.page_token) if request.page_token else 0 

367 user_id = request.user_id or context.user_id 

368 nodes = ( 

369 session.execute( 

370 select(Node) 

371 .join(Cluster, Cluster.parent_node_id == Node.id) 

372 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

373 .where(ClusterSubscription.user_id == user_id) 

374 .where(Cluster.is_official_cluster) 

375 .where(Node.id >= next_node_id) 

376 .order_by(Node.id) 

377 .limit(page_size + 1) 

378 ) 

379 .scalars() 

380 .all() 

381 ) 

382 

383 return communities_pb2.ListUserCommunitiesRes( 

384 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

385 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

386 )