Coverage for src/couchers/servicers/communities.py: 83%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func, or_ 

7 

8from couchers import errors 

9from couchers.db import can_moderate_node, get_node_parents_recursively, session_scope 

10from couchers.models import ( 

11 Cluster, 

12 ClusterRole, 

13 ClusterSubscription, 

14 Discussion, 

15 Event, 

16 EventOccurrence, 

17 Node, 

18 Page, 

19 PageType, 

20 User, 

21) 

22from couchers.servicers.discussions import discussion_to_pb 

23from couchers.servicers.events import event_to_pb 

24from couchers.servicers.groups import group_to_pb 

25from couchers.servicers.pages import page_to_pb 

26from couchers.sql import couchers_select as select 

27from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

28from proto import communities_pb2, communities_pb2_grpc, groups_pb2 

29 

30logger = logging.getLogger(__name__) 

31 

32MAX_PAGINATION_LENGTH = 25 

33 

34 

35def _parents_to_pb(node_id): 

36 with session_scope() as session: 

37 parents = get_node_parents_recursively(session, node_id) 

38 return [ 

39 groups_pb2.Parent( 

40 community=groups_pb2.CommunityParent( 

41 community_id=node_id, 

42 name=cluster.name, 

43 slug=cluster.slug, 

44 description=cluster.description, 

45 ) 

46 ) 

47 for node_id, parent_node_id, level, cluster in parents 

48 ] 

49 

50 

51def community_to_pb(node: Node, context): 

52 with session_scope() as session: 

53 can_moderate = can_moderate_node(session, context.user_id, node.id) 

54 

55 member_count = session.execute( 

56 select(func.count()) 

57 .select_from(ClusterSubscription) 

58 .where_users_column_visible(context, ClusterSubscription.user_id) 

59 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

60 ).scalar_one() 

61 is_member = ( 

62 session.execute( 

63 select(ClusterSubscription) 

64 .where(ClusterSubscription.user_id == context.user_id) 

65 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

66 ).scalar_one_or_none() 

67 is not None 

68 ) 

69 

70 admin_count = session.execute( 

71 select(func.count()) 

72 .select_from(ClusterSubscription) 

73 .where_users_column_visible(context, ClusterSubscription.user_id) 

74 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

75 .where(ClusterSubscription.role == ClusterRole.admin) 

76 ).scalar_one() 

77 is_admin = ( 

78 session.execute( 

79 select(ClusterSubscription) 

80 .where(ClusterSubscription.user_id == context.user_id) 

81 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

82 .where(ClusterSubscription.role == ClusterRole.admin) 

83 ).scalar_one_or_none() 

84 is not None 

85 ) 

86 

87 return communities_pb2.Community( 

88 community_id=node.id, 

89 name=node.official_cluster.name, 

90 slug=node.official_cluster.slug, 

91 description=node.official_cluster.description, 

92 created=Timestamp_from_datetime(node.created), 

93 parents=_parents_to_pb(node.id), 

94 member=is_member, 

95 admin=is_admin, 

96 member_count=member_count, 

97 admin_count=admin_count, 

98 main_page=page_to_pb(node.official_cluster.main_page, context), 

99 can_moderate=can_moderate, 

100 ) 

101 

102 

103class Communities(communities_pb2_grpc.CommunitiesServicer): 

104 def GetCommunity(self, request, context): 

105 with session_scope() as session: 

106 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

107 if not node: 

108 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

109 

110 return community_to_pb(node, context) 

111 

112 def ListCommunities(self, request, context): 

113 with session_scope() as session: 

114 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

115 next_node_id = int(request.page_token) if request.page_token else 0 

116 nodes = ( 

117 session.execute( 

118 select(Node) 

119 .where(or_(Node.parent_node_id == request.community_id, request.community_id == 0)) 

120 .where(Node.id >= next_node_id) 

121 .order_by(Node.id) 

122 .limit(page_size + 1) 

123 ) 

124 .scalars() 

125 .all() 

126 ) 

127 return communities_pb2.ListCommunitiesRes( 

128 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

129 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

130 ) 

131 

132 def ListGroups(self, request, context): 

133 with session_scope() as session: 

134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

135 next_cluster_id = int(request.page_token) if request.page_token else 0 

136 clusters = ( 

137 session.execute( 

138 select(Cluster) 

139 .where(~Cluster.is_official_cluster) # not an official group 

140 .where(Cluster.parent_node_id == request.community_id) 

141 .where(Cluster.id >= next_cluster_id) 

142 .order_by(Cluster.id) 

143 .limit(page_size + 1) 

144 ) 

145 .scalars() 

146 .all() 

147 ) 

148 return communities_pb2.ListGroupsRes( 

149 groups=[group_to_pb(cluster, context) for cluster in clusters[:page_size]], 

150 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

151 ) 

152 

153 def ListAdmins(self, request, context): 

154 with session_scope() as session: 

155 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

156 next_admin_id = int(request.page_token) if request.page_token else 0 

157 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

158 if not node: 

159 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

160 admins = ( 

161 session.execute( 

162 select(User) 

163 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

164 .where_users_visible(context) 

165 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

166 .where(ClusterSubscription.role == ClusterRole.admin) 

167 .where(User.id >= next_admin_id) 

168 .order_by(User.id) 

169 .limit(page_size + 1) 

170 ) 

171 .scalars() 

172 .all() 

173 ) 

174 return communities_pb2.ListAdminsRes( 

175 admin_user_ids=[admin.id for admin in admins[:page_size]], 

176 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

177 ) 

178 

179 def ListMembers(self, request, context): 

180 with session_scope() as session: 

181 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

182 next_member_id = int(request.page_token) if request.page_token else 0 

183 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

184 if not node: 

185 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

186 members = ( 

187 session.execute( 

188 select(User) 

189 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

190 .where_users_visible(context) 

191 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

192 .where(User.id >= next_member_id) 

193 .order_by(User.id) 

194 .limit(page_size + 1) 

195 ) 

196 .scalars() 

197 .all() 

198 ) 

199 return communities_pb2.ListMembersRes( 

200 member_user_ids=[member.id for member in members[:page_size]], 

201 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

202 ) 

203 

204 def ListNearbyUsers(self, request, context): 

205 with session_scope() as session: 

206 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

207 next_nearby_id = int(request.page_token) if request.page_token else 0 

208 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

209 if not node: 

210 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

211 nearbys = ( 

212 session.execute( 

213 select(User) 

214 .where_users_visible(context) 

215 .where(func.ST_Contains(node.geom, User.geom)) 

216 .where(User.id >= next_nearby_id) 

217 .order_by(User.id) 

218 .limit(page_size + 1) 

219 ) 

220 .scalars() 

221 .all() 

222 ) 

223 return communities_pb2.ListNearbyUsersRes( 

224 nearby_user_ids=[nearby.id for nearby in nearbys[:page_size]], 

225 next_page_token=str(nearbys[-1].id) if len(nearbys) > page_size else None, 

226 ) 

227 

228 def ListPlaces(self, request, context): 

229 with session_scope() as session: 

230 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

231 next_page_id = int(request.page_token) if request.page_token else 0 

232 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

233 if not node: 

234 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

235 places = ( 

236 node.official_cluster.owned_pages.where(Page.type == PageType.place) 

237 .where(Page.id >= next_page_id) 

238 .order_by(Page.id) 

239 .limit(page_size + 1) 

240 .all() 

241 ) 

242 return communities_pb2.ListPlacesRes( 

243 places=[page_to_pb(page, context) for page in places[:page_size]], 

244 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

245 ) 

246 

247 def ListGuides(self, request, context): 

248 with session_scope() as session: 

249 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

250 next_page_id = int(request.page_token) if request.page_token else 0 

251 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

252 if not node: 

253 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

254 guides = ( 

255 node.official_cluster.owned_pages.where(Page.type == PageType.guide) 

256 .where(Page.id >= next_page_id) 

257 .order_by(Page.id) 

258 .limit(page_size + 1) 

259 .all() 

260 ) 

261 return communities_pb2.ListGuidesRes( 

262 guides=[page_to_pb(page, context) for page in guides[:page_size]], 

263 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

264 ) 

265 

266 def ListEvents(self, request, context): 

267 with session_scope() as session: 

268 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

269 # the page token is a unix timestamp of where we left off 

270 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

271 

272 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

273 if not node: 

274 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

275 

276 # for communities, we list events owned by this community or for which this is a parent 

277 occurrences = ( 

278 select(EventOccurrence) 

279 .join(Event, Event.id == EventOccurrence.event_id) 

280 .where(or_(Event.owner_cluster == node.official_cluster, Event.parent_node == node)) 

281 ) 

282 

283 if request.past: 

284 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

285 EventOccurrence.start_time.desc() 

286 ) 

287 else: 

288 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

289 EventOccurrence.start_time.asc() 

290 ) 

291 

292 occurrences = occurrences.limit(page_size + 1) 

293 occurrences = session.execute(occurrences).scalars().all() 

294 

295 return communities_pb2.ListEventsRes( 

296 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

297 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

298 ) 

299 

300 def ListDiscussions(self, request, context): 

301 with session_scope() as session: 

302 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

303 next_page_id = int(request.page_token) if request.page_token else 0 

304 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

305 if not node: 

306 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

307 discussions = ( 

308 node.official_cluster.owned_discussions.where(or_(Discussion.id <= next_page_id, next_page_id == 0)) 

309 .order_by(Discussion.id.desc()) 

310 .limit(page_size + 1) 

311 .all() 

312 ) 

313 return communities_pb2.ListDiscussionsRes( 

314 discussions=[discussion_to_pb(discussion, context) for discussion in discussions[:page_size]], 

315 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

316 ) 

317 

318 def JoinCommunity(self, request, context): 

319 with session_scope() as session: 

320 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

321 if not node: 

322 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

323 

324 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

325 if current_membership: 

326 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ALREADY_IN_COMMUNITY) 

327 

328 node.official_cluster.cluster_subscriptions.append( 

329 ClusterSubscription( 

330 user_id=context.user_id, 

331 role=ClusterRole.member, 

332 ) 

333 ) 

334 

335 return empty_pb2.Empty() 

336 

337 def LeaveCommunity(self, request, context): 

338 with session_scope() as session: 

339 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

340 if not node: 

341 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

342 

343 current_membership = node.official_cluster.members.where(User.id == context.user_id).one_or_none() 

344 

345 if not current_membership: 

346 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.NOT_IN_COMMUNITY) 

347 

348 if context.user_id in node.contained_user_ids: 

349 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.CANNOT_LEAVE_CONTAINING_COMMUNITY) 

350 

351 session.execute( 

352 delete(ClusterSubscription) 

353 .where(ClusterSubscription.cluster_id == node.official_cluster.id) 

354 .where(ClusterSubscription.user_id == context.user_id) 

355 ) 

356 

357 return empty_pb2.Empty() 

358 

359 def ListUserCommunities(self, request, context): 

360 with session_scope() as session: 

361 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

362 next_node_id = int(request.page_token) if request.page_token else 0 

363 user_id = request.user_id or context.user_id 

364 nodes = ( 

365 session.execute( 

366 select(Node) 

367 .join(Cluster, Cluster.parent_node_id == Node.id) 

368 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

369 .where(ClusterSubscription.user_id == user_id) 

370 .where(Cluster.is_official_cluster) 

371 .where(Node.id >= next_node_id) 

372 .order_by(Node.id) 

373 .limit(page_size + 1) 

374 ) 

375 .scalars() 

376 .all() 

377 ) 

378 

379 return communities_pb2.ListUserCommunitiesRes( 

380 communities=[community_to_pb(node, context) for node in nodes[:page_size]], 

381 next_page_token=str(nodes[-1].id) if len(nodes) > page_size else None, 

382 )