Coverage for src/couchers/servicers/groups.py: 80%

108 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import delete, func 

7 

8from couchers.db import can_moderate_node, get_node_parents_recursively 

9from couchers.models import ( 

10 Cluster, 

11 ClusterRole, 

12 ClusterSubscription, 

13 Discussion, 

14 Event, 

15 EventOccurrence, 

16 Page, 

17 PageType, 

18 User, 

19) 

20from couchers.proto import groups_pb2, groups_pb2_grpc 

21from couchers.servicers.discussions import discussion_to_pb 

22from couchers.servicers.events import event_to_pb 

23from couchers.servicers.pages import page_to_pb 

24from couchers.sql import couchers_select as select 

25from couchers.utils import Timestamp_from_datetime, dt_from_millis, millis_from_dt, now 

26 

27logger = logging.getLogger(__name__) 

28 

29MAX_PAGINATION_LENGTH = 25 

30 

31 

32def _parents_to_pb(session, cluster: Cluster): 

33 parents = get_node_parents_recursively(session, cluster.parent_node_id) 

34 return [ 

35 groups_pb2.Parent( 

36 community=groups_pb2.CommunityParent( 

37 community_id=node_id, 

38 name=cluster.name, 

39 slug=cluster.slug, 

40 description=cluster.description, 

41 ) 

42 ) 

43 for node_id, parent_node_id, level, cluster in parents 

44 ] + [ 

45 groups_pb2.Parent( 

46 group=groups_pb2.GroupParent( 

47 group_id=cluster.id, 

48 name=cluster.name, 

49 slug=cluster.slug, 

50 description=cluster.description, 

51 ) 

52 ) 

53 ] 

54 

55 

56def group_to_pb(session, cluster: Cluster, context): 

57 can_moderate = can_moderate_node(session, context.user_id, cluster.parent_node_id) 

58 

59 member_count = session.execute( 

60 select(func.count()) 

61 .select_from(ClusterSubscription) 

62 .where_users_column_visible(context, ClusterSubscription.user_id) 

63 .where(ClusterSubscription.cluster_id == cluster.id) 

64 ).scalar_one() 

65 is_member = ( 

66 session.execute( 

67 select(ClusterSubscription) 

68 .where(ClusterSubscription.user_id == context.user_id) 

69 .where(ClusterSubscription.cluster_id == cluster.id) 

70 ).scalar_one_or_none() 

71 is not None 

72 ) 

73 

74 admin_count = session.execute( 

75 select(func.count()) 

76 .select_from(ClusterSubscription) 

77 .where_users_column_visible(context, ClusterSubscription.user_id) 

78 .where(ClusterSubscription.cluster_id == cluster.id) 

79 .where(ClusterSubscription.role == ClusterRole.admin) 

80 ).scalar_one() 

81 is_admin = ( 

82 session.execute( 

83 select(ClusterSubscription) 

84 .where(ClusterSubscription.user_id == context.user_id) 

85 .where(ClusterSubscription.cluster_id == cluster.id) 

86 .where(ClusterSubscription.role == ClusterRole.admin) 

87 ).scalar_one_or_none() 

88 is not None 

89 ) 

90 

91 return groups_pb2.Group( 

92 group_id=cluster.id, 

93 name=cluster.name, 

94 slug=cluster.slug, 

95 description=cluster.description, 

96 created=Timestamp_from_datetime(cluster.created), 

97 parents=_parents_to_pb(session, cluster), 

98 main_page=page_to_pb(session, cluster.main_page, context), 

99 member=is_member, 

100 admin=is_admin, 

101 member_count=member_count, 

102 admin_count=admin_count, 

103 can_moderate=can_moderate, 

104 ) 

105 

106 

107class Groups(groups_pb2_grpc.GroupsServicer): 

108 def GetGroup(self, request, context, session): 

109 cluster = session.execute( 

110 select(Cluster) 

111 .where(~Cluster.is_official_cluster) # not an official group 

112 .where(Cluster.id == request.group_id) 

113 ).scalar_one_or_none() 

114 if not cluster: 

115 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

116 

117 return group_to_pb(session, cluster, context) 

118 

119 def ListAdmins(self, request, context, session): 

120 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

121 next_admin_id = int(request.page_token) if request.page_token else 0 

122 cluster = session.execute( 

123 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

124 ).scalar_one_or_none() 

125 if not cluster: 

126 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

127 

128 admins = ( 

129 session.execute( 

130 select(User) 

131 .where_users_visible(context) 

132 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

133 .where(ClusterSubscription.cluster_id == cluster.id) 

134 .where(ClusterSubscription.role == ClusterRole.admin) 

135 .where(User.id >= next_admin_id) 

136 .order_by(User.id) 

137 .limit(page_size + 1) 

138 ) 

139 .scalars() 

140 .all() 

141 ) 

142 return groups_pb2.ListAdminsRes( 

143 admin_user_ids=[admin.id for admin in admins[:page_size]], 

144 next_page_token=str(admins[-1].id) if len(admins) > page_size else None, 

145 ) 

146 

147 def ListMembers(self, request, context, session): 

148 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

149 next_member_id = int(request.page_token) if request.page_token else 0 

150 cluster = session.execute( 

151 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

152 ).scalar_one_or_none() 

153 if not cluster: 

154 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

155 

156 members = ( 

157 session.execute( 

158 select(User) 

159 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

160 .where_users_visible(context) 

161 .where(ClusterSubscription.cluster_id == cluster.id) 

162 .where(User.id >= next_member_id) 

163 .order_by(User.id) 

164 .limit(page_size + 1) 

165 ) 

166 .scalars() 

167 .all() 

168 ) 

169 return groups_pb2.ListMembersRes( 

170 member_user_ids=[member.id for member in members[:page_size]], 

171 next_page_token=str(members[-1].id) if len(members) > page_size else None, 

172 ) 

173 

174 def ListPlaces(self, request, context, session): 

175 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

176 next_page_id = int(request.page_token) if request.page_token else 0 

177 cluster = session.execute( 

178 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

179 ).scalar_one_or_none() 

180 if not cluster: 

181 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

182 places = ( 

183 cluster.owned_pages.where(Page.type == PageType.place) 

184 .where(Page.id >= next_page_id) 

185 .order_by(Page.id) 

186 .limit(page_size + 1) 

187 .all() 

188 ) 

189 return groups_pb2.ListPlacesRes( 

190 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

191 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

192 ) 

193 

194 def ListGuides(self, request, context, session): 

195 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

196 next_page_id = int(request.page_token) if request.page_token else 0 

197 cluster = session.execute( 

198 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

199 ).scalar_one_or_none() 

200 if not cluster: 

201 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

202 guides = ( 

203 cluster.owned_pages.where(Page.type == PageType.guide) 

204 .where(Page.id >= next_page_id) 

205 .order_by(Page.id) 

206 .limit(page_size + 1) 

207 .all() 

208 ) 

209 return groups_pb2.ListGuidesRes( 

210 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

211 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

212 ) 

213 

214 def ListEvents(self, request, context, session): 

215 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

216 # the page token is a unix timestamp of where we left off 

217 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

218 

219 cluster = session.execute( 

220 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

221 ).scalar_one_or_none() 

222 if not cluster: 

223 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

224 

225 occurrences = ( 

226 select(EventOccurrence) 

227 .join(Event, Event.id == EventOccurrence.event_id) 

228 .where(Event.owner_cluster == cluster) 

229 ) 

230 

231 if not request.past: 

232 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

233 EventOccurrence.start_time.asc() 

234 ) 

235 else: 

236 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

237 EventOccurrence.start_time.desc() 

238 ) 

239 

240 occurrences = occurrences.limit(page_size + 1) 

241 occurrences = session.execute(occurrences).scalars().all() 

242 

243 return groups_pb2.ListEventsRes( 

244 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

245 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

246 ) 

247 

248 def ListDiscussions(self, request, context, session): 

249 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

250 next_page_id = int(request.page_token) if request.page_token else 0 

251 cluster = session.execute( 

252 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

253 ).scalar_one_or_none() 

254 if not cluster: 

255 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

256 discussions = ( 

257 cluster.owned_discussions.where(Discussion.id >= next_page_id) 

258 .order_by(Discussion.id) 

259 .limit(page_size + 1) 

260 .all() 

261 ) 

262 return groups_pb2.ListDiscussionsRes( 

263 discussions=[discussion_to_pb(session, discussion, context) for discussion in discussions[:page_size]], 

264 next_page_token=str(discussions[-1].id) if len(discussions) > page_size else None, 

265 ) 

266 

267 def JoinGroup(self, request, context, session): 

268 cluster = session.execute( 

269 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

270 ).scalar_one_or_none() 

271 if not cluster: 

272 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

273 

274 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

275 if user_in_group: 

276 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "already_in_group") 

277 

278 cluster.cluster_subscriptions.append( 

279 ClusterSubscription( 

280 user_id=context.user_id, 

281 role=ClusterRole.member, 

282 ) 

283 ) 

284 

285 return empty_pb2.Empty() 

286 

287 def LeaveGroup(self, request, context, session): 

288 cluster = session.execute( 

289 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.group_id) 

290 ).scalar_one_or_none() 

291 if not cluster: 

292 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_not_found") 

293 

294 user_in_group = cluster.members.where(User.id == context.user_id).one_or_none() 

295 if not user_in_group: 

296 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "not_in_group") 

297 

298 session.execute( 

299 delete(ClusterSubscription) 

300 .where(ClusterSubscription.cluster_id == request.group_id) 

301 .where(ClusterSubscription.user_id == context.user_id) 

302 ) 

303 

304 return empty_pb2.Empty() 

305 

306 def ListUserGroups(self, request, context, session): 

307 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

308 next_cluster_id = int(request.page_token) if request.page_token else 0 

309 user_id = request.user_id or context.user_id 

310 clusters = ( 

311 session.execute( 

312 select(Cluster) 

313 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

314 .where(ClusterSubscription.user_id == user_id) 

315 .where(~Cluster.is_official_cluster) # not an official group 

316 .where(Cluster.id >= next_cluster_id) 

317 .order_by(Cluster.id) 

318 .limit(page_size + 1) 

319 ) 

320 .scalars() 

321 .all() 

322 ) 

323 return groups_pb2.ListUserGroupsRes( 

324 groups=[group_to_pb(session, cluster, context) for cluster in clusters[:page_size]], 

325 next_page_token=str(clusters[-1].id) if len(clusters) > page_size else None, 

326 )