Coverage for src/couchers/servicers/pages.py: 95%

156 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-07-22 17:19 +0000

1import grpc 

2 

3from couchers import errors 

4from couchers.db import can_moderate_at, can_moderate_node, get_parent_node_at_location, session_scope 

5from couchers.models import Cluster, Node, Page, PageType, PageVersion, Thread, Upload, User 

6from couchers.servicers.threads import thread_to_pb 

7from couchers.sql import couchers_select as select 

8from couchers.utils import Timestamp_from_datetime, create_coordinate, remove_duplicates_retain_order 

9from proto import pages_pb2, pages_pb2_grpc 

10 

11MAX_PAGINATION_LENGTH = 25 

12 

13pagetype2sql = { 

14 pages_pb2.PAGE_TYPE_PLACE: PageType.place, 

15 pages_pb2.PAGE_TYPE_GUIDE: PageType.guide, 

16 pages_pb2.PAGE_TYPE_MAIN_PAGE: PageType.main_page, 

17} 

18 

19pagetype2api = { 

20 PageType.place: pages_pb2.PAGE_TYPE_PLACE, 

21 PageType.guide: pages_pb2.PAGE_TYPE_GUIDE, 

22 PageType.main_page: pages_pb2.PAGE_TYPE_MAIN_PAGE, 

23} 

24 

25 

26def _is_page_owner(page: Page, user_id): 

27 """ 

28 Checks whether the user can act as an owner of the page 

29 """ 

30 if page.owner_user: 

31 return page.owner_user_id == user_id 

32 # otherwise owned by a cluster 

33 return page.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

34 

35 

36def _can_moderate_page(page: Page, user_id): 

37 """ 

38 Checks if the user is allowed to moderate this page 

39 """ 

40 # checks if either the page is in the exclusive moderation area of a node 

41 with session_scope() as session: 

42 latest_version = page.versions[-1] 

43 

44 # if the page has a location, we firstly check if we are the moderator of any node that contains this page 

45 if latest_version.geom is not None and can_moderate_at(session, user_id, latest_version.geom): 

46 return True 

47 

48 # if the page is owned by a cluster, then any moderator of that cluster can moderate this page 

49 if page.owner_cluster is not None and can_moderate_node(session, user_id, page.owner_cluster.parent_node_id): 

50 return True 

51 

52 # finally check if the user can moderate the parent node of the cluster 

53 return can_moderate_node(session, user_id, page.parent_node_id) 

54 

55 

56def page_to_pb(page: Page, context): 

57 first_version = page.versions[0] 

58 current_version = page.versions[-1] 

59 

60 owner_community_id = None 

61 owner_group_id = None 

62 if page.owner_cluster: 

63 if page.owner_cluster.is_official_cluster: 

64 owner_community_id = page.owner_cluster.parent_node_id 

65 else: 

66 owner_group_id = page.owner_cluster.id 

67 

68 can_moderate = _can_moderate_page(page, context.user_id) 

69 

70 return pages_pb2.Page( 

71 page_id=page.id, 

72 type=pagetype2api[page.type], 

73 slug=current_version.slug, 

74 created=Timestamp_from_datetime(first_version.created), 

75 last_edited=Timestamp_from_datetime(current_version.created), 

76 last_editor_user_id=current_version.editor_user_id, 

77 creator_user_id=page.creator_user_id, 

78 owner_user_id=page.owner_user_id, 

79 owner_community_id=owner_community_id, 

80 owner_group_id=owner_group_id, 

81 thread=thread_to_pb(page.thread_id), 

82 title=current_version.title, 

83 content=current_version.content, 

84 photo_url=current_version.photo.full_url if current_version.photo_key else None, 

85 address=current_version.address, 

86 location=( 

87 pages_pb2.Coordinate( 

88 lat=current_version.coordinates[0], 

89 lng=current_version.coordinates[1], 

90 ) 

91 if current_version.coordinates 

92 else None 

93 ), 

94 editor_user_ids=remove_duplicates_retain_order([version.editor_user_id for version in page.versions]), 

95 can_edit=_is_page_owner(page, context.user_id) or can_moderate, 

96 can_moderate=can_moderate, 

97 ) 

98 

99 

100class Pages(pages_pb2_grpc.PagesServicer): 

101 def CreatePlace(self, request, context): 

102 if not request.title: 

103 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

104 if not request.content: 

105 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

106 if not request.address: 

107 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

108 if not request.HasField("location"): 

109 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_LOCATION) 

110 if request.location.lat == 0 and request.location.lng == 0: 

111 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

112 

113 geom = create_coordinate(request.location.lat, request.location.lng) 

114 

115 with session_scope() as session: 

116 if ( 

117 request.photo_key 

118 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

119 ): 

120 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

121 

122 page = Page( 

123 parent_node=get_parent_node_at_location(session, geom), 

124 type=PageType.place, 

125 creator_user_id=context.user_id, 

126 owner_user_id=context.user_id, 

127 thread=Thread(), 

128 ) 

129 session.add(page) 

130 session.flush() 

131 page_version = PageVersion( 

132 page=page, 

133 editor_user_id=context.user_id, 

134 title=request.title, 

135 content=request.content, 

136 photo_key=request.photo_key if request.photo_key else None, 

137 address=request.address, 

138 geom=geom, 

139 ) 

140 session.add(page_version) 

141 session.commit() 

142 return page_to_pb(page, context) 

143 

144 def CreateGuide(self, request, context): 

145 if not request.title: 

146 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

147 if not request.content: 

148 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

149 if request.address and request.HasField("location"): 

150 address = request.address 

151 if request.location.lat == 0 and request.location.lng == 0: 

152 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

153 geom = create_coordinate(request.location.lat, request.location.lng) 

154 elif not request.address and not request.HasField("location"): 

155 address = None 

156 geom = None 

157 else: 

158 # you have to have both or neither 

159 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_GUIDE_LOCATION) 

160 

161 if not request.parent_community_id: 

162 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_PARENT) 

163 

164 with session_scope() as session: 

165 parent_node = session.execute( 

166 select(Node).where(Node.id == request.parent_community_id) 

167 ).scalar_one_or_none() 

168 

169 if not parent_node: 

170 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

171 

172 if ( 

173 request.photo_key 

174 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

175 ): 

176 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

177 

178 page = Page( 

179 parent_node=parent_node, 

180 type=PageType.guide, 

181 creator_user_id=context.user_id, 

182 owner_user_id=context.user_id, 

183 thread=Thread(), 

184 ) 

185 session.add(page) 

186 session.flush() 

187 page_version = PageVersion( 

188 page=page, 

189 editor_user_id=context.user_id, 

190 title=request.title, 

191 content=request.content, 

192 photo_key=request.photo_key if request.photo_key else None, 

193 address=address, 

194 geom=geom, 

195 ) 

196 session.add(page_version) 

197 session.commit() 

198 return page_to_pb(page, context) 

199 

200 def GetPage(self, request, context): 

201 with session_scope() as session: 

202 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

203 if not page: 

204 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

205 

206 return page_to_pb(page, context) 

207 

208 def UpdatePage(self, request, context): 

209 with session_scope() as session: 

210 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

211 if not page: 

212 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

213 

214 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id): 

215 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_UPDATE_PERMISSION_DENIED) 

216 

217 current_version = page.versions[-1] 

218 

219 page_version = PageVersion( 

220 page=page, 

221 editor_user_id=context.user_id, 

222 title=current_version.title, 

223 content=current_version.content, 

224 photo_key=current_version.photo_key, 

225 address=current_version.address, 

226 geom=current_version.geom, 

227 ) 

228 

229 if request.HasField("title"): 

230 if not request.title.value: 

231 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

232 page_version.title = request.title.value 

233 

234 if request.HasField("content"): 

235 if not request.content.value: 

236 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

237 page_version.content = request.content.value 

238 

239 if request.HasField("photo_key"): 

240 if not request.photo_key.value: 

241 page_version.photo_key = None 

242 else: 

243 if not session.execute( 

244 select(Upload).where(Upload.key == request.photo_key.value) 

245 ).scalar_one_or_none(): 

246 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

247 page_version.photo_key = request.photo_key.value 

248 

249 if request.HasField("address"): 

250 if not request.address.value: 

251 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

252 page_version.address = request.address.value 

253 

254 if request.HasField("location"): 

255 page_version.geom = create_coordinate(request.location.lat, request.location.lng) 

256 

257 session.add(page_version) 

258 session.commit() 

259 return page_to_pb(page, context) 

260 

261 def TransferPage(self, request, context): 

262 with session_scope() as session: 

263 page = session.execute( 

264 select(Page).where(Page.id == request.page_id).where(Page.type != PageType.main_page) 

265 ).scalar_one_or_none() 

266 

267 if not page: 

268 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

269 

270 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id): 

271 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_TRANSFER_PERMISSION_DENIED) 

272 

273 if request.WhichOneof("new_owner") == "new_owner_group_id": 

274 cluster = session.execute( 

275 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

276 ).scalar_one_or_none() 

277 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

278 cluster = session.execute( 

279 select(Cluster) 

280 .where(Cluster.parent_node_id == request.new_owner_community_id) 

281 .where(Cluster.is_official_cluster) 

282 ).scalar_one_or_none() 

283 else: 

284 # i'm not sure if this needs to be checked 

285 context.abort(grpc.StatusCode.UNKNOWN, errors.UNKNOWN_ERROR) 

286 

287 if not cluster: 

288 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

289 

290 page.owner_user = None 

291 page.owner_cluster = cluster 

292 

293 session.commit() 

294 return page_to_pb(page, context) 

295 

296 def ListUserPlaces(self, request, context): 

297 with session_scope() as session: 

298 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

299 next_page_id = int(request.page_token) if request.page_token else 0 

300 user_id = request.user_id or context.user_id 

301 places = ( 

302 session.execute( 

303 select(Page) 

304 .where(Page.owner_user_id == user_id) 

305 .where(Page.type == PageType.place) 

306 .where(Page.id >= next_page_id) 

307 .order_by(Page.id) 

308 .limit(page_size + 1) 

309 ) 

310 .scalars() 

311 .all() 

312 ) 

313 return pages_pb2.ListUserPlacesRes( 

314 places=[page_to_pb(page, context) for page in places[:page_size]], 

315 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

316 ) 

317 

318 def ListUserGuides(self, request, context): 

319 with session_scope() as session: 

320 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

321 next_page_id = int(request.page_token) if request.page_token else 0 

322 user_id = request.user_id or context.user_id 

323 guides = ( 

324 session.execute( 

325 select(Page) 

326 .where(Page.owner_user_id == user_id) 

327 .where(Page.type == PageType.guide) 

328 .where(Page.id >= next_page_id) 

329 .order_by(Page.id) 

330 .limit(page_size + 1) 

331 ) 

332 .scalars() 

333 .all() 

334 ) 

335 return pages_pb2.ListUserGuidesRes( 

336 guides=[page_to_pb(page, context) for page in guides[:page_size]], 

337 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

338 )