Coverage for src/couchers/servicers/pages.py: 95%

148 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-10-15 13:03 +0000

1import grpc 

2 

3from couchers import errors 

4from couchers.db import can_moderate_at, can_moderate_node, get_parent_node_at_location 

5from couchers.models import Cluster, Node, Page, PageType, PageVersion, Thread, Upload, User 

6from couchers.servicers.threads import thread_to_pb 

7from couchers.sql import couchers_select as select 

8from couchers.utils import Timestamp_from_datetime, create_coordinate, remove_duplicates_retain_order 

9from proto import pages_pb2, pages_pb2_grpc 

10 

11MAX_PAGINATION_LENGTH = 25 

12 

13pagetype2sql = { 

14 pages_pb2.PAGE_TYPE_PLACE: PageType.place, 

15 pages_pb2.PAGE_TYPE_GUIDE: PageType.guide, 

16 pages_pb2.PAGE_TYPE_MAIN_PAGE: PageType.main_page, 

17} 

18 

19pagetype2api = { 

20 PageType.place: pages_pb2.PAGE_TYPE_PLACE, 

21 PageType.guide: pages_pb2.PAGE_TYPE_GUIDE, 

22 PageType.main_page: pages_pb2.PAGE_TYPE_MAIN_PAGE, 

23} 

24 

25 

26def _is_page_owner(page: Page, user_id): 

27 """ 

28 Checks whether the user can act as an owner of the page 

29 """ 

30 if page.owner_user: 

31 return page.owner_user_id == user_id 

32 # otherwise owned by a cluster 

33 return page.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

34 

35 

36def _can_moderate_page(session, page: Page, user_id): 

37 """ 

38 Checks if the user is allowed to moderate this page 

39 """ 

40 # checks if either the page is in the exclusive moderation area of a node 

41 latest_version = page.versions[-1] 

42 

43 # if the page has a location, we firstly check if we are the moderator of any node that contains this page 

44 if latest_version.geom is not None and can_moderate_at(session, user_id, latest_version.geom): 

45 return True 

46 

47 # if the page is owned by a cluster, then any moderator of that cluster can moderate this page 

48 if page.owner_cluster is not None and can_moderate_node(session, user_id, page.owner_cluster.parent_node_id): 

49 return True 

50 

51 # finally check if the user can moderate the parent node of the cluster 

52 return can_moderate_node(session, user_id, page.parent_node_id) 

53 

54 

55def page_to_pb(session, page: Page, context): 

56 first_version = page.versions[0] 

57 current_version = page.versions[-1] 

58 

59 owner_community_id = None 

60 owner_group_id = None 

61 if page.owner_cluster: 

62 if page.owner_cluster.is_official_cluster: 

63 owner_community_id = page.owner_cluster.parent_node_id 

64 else: 

65 owner_group_id = page.owner_cluster.id 

66 

67 can_moderate = _can_moderate_page(session, page, context.user_id) 

68 

69 return pages_pb2.Page( 

70 page_id=page.id, 

71 type=pagetype2api[page.type], 

72 slug=current_version.slug, 

73 created=Timestamp_from_datetime(first_version.created), 

74 last_edited=Timestamp_from_datetime(current_version.created), 

75 last_editor_user_id=current_version.editor_user_id, 

76 creator_user_id=page.creator_user_id, 

77 owner_user_id=page.owner_user_id, 

78 owner_community_id=owner_community_id, 

79 owner_group_id=owner_group_id, 

80 thread=thread_to_pb(session, page.thread_id), 

81 title=current_version.title, 

82 content=current_version.content, 

83 photo_url=current_version.photo.full_url if current_version.photo_key else None, 

84 address=current_version.address, 

85 location=( 

86 pages_pb2.Coordinate( 

87 lat=current_version.coordinates[0], 

88 lng=current_version.coordinates[1], 

89 ) 

90 if current_version.coordinates 

91 else None 

92 ), 

93 editor_user_ids=remove_duplicates_retain_order([version.editor_user_id for version in page.versions]), 

94 can_edit=_is_page_owner(page, context.user_id) or can_moderate, 

95 can_moderate=can_moderate, 

96 ) 

97 

98 

99class Pages(pages_pb2_grpc.PagesServicer): 

100 def CreatePlace(self, request, context, session): 

101 if not request.title: 

102 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

103 if not request.content: 

104 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

105 if not request.address: 

106 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

107 if not request.HasField("location"): 

108 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_LOCATION) 

109 if request.location.lat == 0 and request.location.lng == 0: 

110 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

111 

112 geom = create_coordinate(request.location.lat, request.location.lng) 

113 

114 if ( 

115 request.photo_key 

116 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

117 ): 

118 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

119 

120 page = Page( 

121 parent_node=get_parent_node_at_location(session, geom), 

122 type=PageType.place, 

123 creator_user_id=context.user_id, 

124 owner_user_id=context.user_id, 

125 thread=Thread(), 

126 ) 

127 session.add(page) 

128 session.flush() 

129 page_version = PageVersion( 

130 page=page, 

131 editor_user_id=context.user_id, 

132 title=request.title, 

133 content=request.content, 

134 photo_key=request.photo_key if request.photo_key else None, 

135 address=request.address, 

136 geom=geom, 

137 ) 

138 session.add(page_version) 

139 session.commit() 

140 return page_to_pb(session, page, context) 

141 

142 def CreateGuide(self, request, context, session): 

143 if not request.title: 

144 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

145 if not request.content: 

146 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

147 if request.address and request.HasField("location"): 

148 address = request.address 

149 if request.location.lat == 0 and request.location.lng == 0: 

150 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

151 geom = create_coordinate(request.location.lat, request.location.lng) 

152 elif not request.address and not request.HasField("location"): 

153 address = None 

154 geom = None 

155 else: 

156 # you have to have both or neither 

157 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_GUIDE_LOCATION) 

158 

159 if not request.parent_community_id: 

160 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_PARENT) 

161 

162 parent_node = session.execute(select(Node).where(Node.id == request.parent_community_id)).scalar_one_or_none() 

163 

164 if not parent_node: 

165 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

166 

167 if ( 

168 request.photo_key 

169 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

170 ): 

171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

172 

173 page = Page( 

174 parent_node=parent_node, 

175 type=PageType.guide, 

176 creator_user_id=context.user_id, 

177 owner_user_id=context.user_id, 

178 thread=Thread(), 

179 ) 

180 session.add(page) 

181 session.flush() 

182 page_version = PageVersion( 

183 page=page, 

184 editor_user_id=context.user_id, 

185 title=request.title, 

186 content=request.content, 

187 photo_key=request.photo_key if request.photo_key else None, 

188 address=address, 

189 geom=geom, 

190 ) 

191 session.add(page_version) 

192 session.commit() 

193 return page_to_pb(session, page, context) 

194 

195 def GetPage(self, request, context, session): 

196 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

197 if not page: 

198 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

199 

200 return page_to_pb(session, page, context) 

201 

202 def UpdatePage(self, request, context, session): 

203 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

204 if not page: 

205 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

206 

207 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(session, page, context.user_id): 

208 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_UPDATE_PERMISSION_DENIED) 

209 

210 current_version = page.versions[-1] 

211 

212 page_version = PageVersion( 

213 page=page, 

214 editor_user_id=context.user_id, 

215 title=current_version.title, 

216 content=current_version.content, 

217 photo_key=current_version.photo_key, 

218 address=current_version.address, 

219 geom=current_version.geom, 

220 ) 

221 

222 if request.HasField("title"): 

223 if not request.title.value: 

224 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

225 page_version.title = request.title.value 

226 

227 if request.HasField("content"): 

228 if not request.content.value: 

229 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

230 page_version.content = request.content.value 

231 

232 if request.HasField("photo_key"): 

233 if not request.photo_key.value: 

234 page_version.photo_key = None 

235 else: 

236 if not session.execute( 

237 select(Upload).where(Upload.key == request.photo_key.value) 

238 ).scalar_one_or_none(): 

239 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

240 page_version.photo_key = request.photo_key.value 

241 

242 if request.HasField("address"): 

243 if not request.address.value: 

244 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

245 page_version.address = request.address.value 

246 

247 if request.HasField("location"): 

248 page_version.geom = create_coordinate(request.location.lat, request.location.lng) 

249 

250 session.add(page_version) 

251 session.commit() 

252 return page_to_pb(session, page, context) 

253 

254 def TransferPage(self, request, context, session): 

255 page = session.execute( 

256 select(Page).where(Page.id == request.page_id).where(Page.type != PageType.main_page) 

257 ).scalar_one_or_none() 

258 

259 if not page: 

260 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

261 

262 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(session, page, context.user_id): 

263 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_TRANSFER_PERMISSION_DENIED) 

264 

265 if request.WhichOneof("new_owner") == "new_owner_group_id": 

266 cluster = session.execute( 

267 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

268 ).scalar_one_or_none() 

269 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

270 cluster = session.execute( 

271 select(Cluster) 

272 .where(Cluster.parent_node_id == request.new_owner_community_id) 

273 .where(Cluster.is_official_cluster) 

274 ).scalar_one_or_none() 

275 else: 

276 # i'm not sure if this needs to be checked 

277 context.abort(grpc.StatusCode.UNKNOWN, errors.UNKNOWN_ERROR) 

278 

279 if not cluster: 

280 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

281 

282 page.owner_user = None 

283 page.owner_cluster = cluster 

284 

285 session.commit() 

286 return page_to_pb(session, page, context) 

287 

288 def ListUserPlaces(self, request, context, session): 

289 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

290 next_page_id = int(request.page_token) if request.page_token else 0 

291 user_id = request.user_id or context.user_id 

292 places = ( 

293 session.execute( 

294 select(Page) 

295 .where(Page.owner_user_id == user_id) 

296 .where(Page.type == PageType.place) 

297 .where(Page.id >= next_page_id) 

298 .order_by(Page.id) 

299 .limit(page_size + 1) 

300 ) 

301 .scalars() 

302 .all() 

303 ) 

304 return pages_pb2.ListUserPlacesRes( 

305 places=[page_to_pb(session, page, context) for page in places[:page_size]], 

306 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

307 ) 

308 

309 def ListUserGuides(self, request, context, session): 

310 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

311 next_page_id = int(request.page_token) if request.page_token else 0 

312 user_id = request.user_id or context.user_id 

313 guides = ( 

314 session.execute( 

315 select(Page) 

316 .where(Page.owner_user_id == user_id) 

317 .where(Page.type == PageType.guide) 

318 .where(Page.id >= next_page_id) 

319 .order_by(Page.id) 

320 .limit(page_size + 1) 

321 ) 

322 .scalars() 

323 .all() 

324 ) 

325 return pages_pb2.ListUserGuidesRes( 

326 guides=[page_to_pb(session, page, context) for page in guides[:page_size]], 

327 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

328 )