Coverage for src/couchers/servicers/pages.py: 95%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

155 statements  

1import grpc 

2 

3from couchers import errors 

4from couchers.db import can_moderate_at, can_moderate_node, get_parent_node_at_location, session_scope 

5from couchers.models import Cluster, Node, Page, PageType, PageVersion, Thread, Upload, User 

6from couchers.servicers.threads import thread_to_pb 

7from couchers.sql import couchers_select as select 

8from couchers.utils import Timestamp_from_datetime, create_coordinate, remove_duplicates_retain_order 

9from proto import pages_pb2, pages_pb2_grpc 

10 

11MAX_PAGINATION_LENGTH = 25 

12 

13pagetype2sql = { 

14 pages_pb2.PAGE_TYPE_PLACE: PageType.place, 

15 pages_pb2.PAGE_TYPE_GUIDE: PageType.guide, 

16 pages_pb2.PAGE_TYPE_MAIN_PAGE: PageType.main_page, 

17} 

18 

19pagetype2api = { 

20 PageType.place: pages_pb2.PAGE_TYPE_PLACE, 

21 PageType.guide: pages_pb2.PAGE_TYPE_GUIDE, 

22 PageType.main_page: pages_pb2.PAGE_TYPE_MAIN_PAGE, 

23} 

24 

25 

26def _is_page_owner(page: Page, user_id): 

27 """ 

28 Checks whether the user can act as an owner of the page 

29 """ 

30 if page.owner_user: 

31 return page.owner_user_id == user_id 

32 # otherwise owned by a cluster 

33 return page.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

34 

35 

36def _can_moderate_page(page: Page, user_id): 

37 """ 

38 Checks if the user is allowed to moderate this page 

39 """ 

40 # checks if either the page is in the exclusive moderation area of a node 

41 with session_scope() as session: 

42 latest_version = page.versions[-1] 

43 

44 # if the page has a location, we firstly check if we are the moderator of any node that contains this page 

45 if latest_version.geom is not None and can_moderate_at(session, user_id, latest_version.geom): 

46 return True 

47 

48 # if the page is owned by a cluster, then any moderator of that cluster can moderate this page 

49 if page.owner_cluster is not None and can_moderate_node(session, user_id, page.owner_cluster.parent_node_id): 

50 return True 

51 

52 # finally check if the user can moderate the parent node of the cluster 

53 return can_moderate_node(session, user_id, page.parent_node_id) 

54 

55 

56def page_to_pb(page: Page, context): 

57 first_version = page.versions[0] 

58 current_version = page.versions[-1] 

59 

60 owner_community_id = None 

61 owner_group_id = None 

62 if page.owner_cluster: 

63 if page.owner_cluster.is_official_cluster: 

64 owner_community_id = page.owner_cluster.parent_node_id 

65 else: 

66 owner_group_id = page.owner_cluster.id 

67 

68 return pages_pb2.Page( 

69 page_id=page.id, 

70 type=pagetype2api[page.type], 

71 slug=current_version.slug, 

72 created=Timestamp_from_datetime(first_version.created), 

73 last_edited=Timestamp_from_datetime(current_version.created), 

74 last_editor_user_id=current_version.editor_user_id, 

75 creator_user_id=page.creator_user_id, 

76 owner_user_id=page.owner_user_id, 

77 owner_community_id=owner_community_id, 

78 owner_group_id=owner_group_id, 

79 thread=thread_to_pb(page.thread_id), 

80 title=current_version.title, 

81 content=current_version.content, 

82 photo_url=current_version.photo.full_url if current_version.photo_key else None, 

83 address=current_version.address, 

84 location=pages_pb2.Coordinate( 

85 lat=current_version.coordinates[0], 

86 lng=current_version.coordinates[1], 

87 ) 

88 if current_version.coordinates 

89 else None, 

90 editor_user_ids=remove_duplicates_retain_order([version.editor_user_id for version in page.versions]), 

91 can_edit=_is_page_owner(page, context.user_id), 

92 can_moderate=_can_moderate_page(page, context.user_id), 

93 ) 

94 

95 

96class Pages(pages_pb2_grpc.PagesServicer): 

97 def CreatePlace(self, request, context): 

98 if not request.title: 

99 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

100 if not request.content: 

101 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

102 if not request.address: 

103 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

104 if not request.HasField("location"): 

105 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_LOCATION) 

106 if request.location.lat == 0 and request.location.lng == 0: 

107 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

108 

109 geom = create_coordinate(request.location.lat, request.location.lng) 

110 

111 with session_scope() as session: 

112 if ( 

113 request.photo_key 

114 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

115 ): 

116 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

117 

118 page = Page( 

119 parent_node=get_parent_node_at_location(session, geom), 

120 type=PageType.place, 

121 creator_user_id=context.user_id, 

122 owner_user_id=context.user_id, 

123 thread=Thread(), 

124 ) 

125 session.add(page) 

126 session.flush() 

127 page_version = PageVersion( 

128 page=page, 

129 editor_user_id=context.user_id, 

130 title=request.title, 

131 content=request.content, 

132 photo_key=request.photo_key if request.photo_key else None, 

133 address=request.address, 

134 geom=geom, 

135 ) 

136 session.add(page_version) 

137 session.commit() 

138 return page_to_pb(page, context) 

139 

140 def CreateGuide(self, request, context): 

141 if not request.title: 

142 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

143 if not request.content: 

144 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

145 if request.address and request.HasField("location"): 

146 address = request.address 

147 if request.location.lat == 0 and request.location.lng == 0: 

148 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

149 geom = create_coordinate(request.location.lat, request.location.lng) 

150 elif not request.address and not request.HasField("location"): 

151 address = None 

152 geom = None 

153 else: 

154 # you have to have both or neither 

155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_GUIDE_LOCATION) 

156 

157 if not request.parent_community_id: 

158 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_PARENT) 

159 

160 with session_scope() as session: 

161 parent_node = session.execute( 

162 select(Node).where(Node.id == request.parent_community_id) 

163 ).scalar_one_or_none() 

164 

165 if not parent_node: 

166 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

167 

168 if ( 

169 request.photo_key 

170 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

171 ): 

172 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

173 

174 page = Page( 

175 parent_node=parent_node, 

176 type=PageType.guide, 

177 creator_user_id=context.user_id, 

178 owner_user_id=context.user_id, 

179 thread=Thread(), 

180 ) 

181 session.add(page) 

182 session.flush() 

183 page_version = PageVersion( 

184 page=page, 

185 editor_user_id=context.user_id, 

186 title=request.title, 

187 content=request.content, 

188 photo_key=request.photo_key if request.photo_key else None, 

189 address=address, 

190 geom=geom, 

191 ) 

192 session.add(page_version) 

193 session.commit() 

194 return page_to_pb(page, context) 

195 

196 def GetPage(self, request, context): 

197 with session_scope() as session: 

198 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

199 if not page: 

200 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

201 

202 return page_to_pb(page, context) 

203 

204 def UpdatePage(self, request, context): 

205 with session_scope() as session: 

206 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none() 

207 if not page: 

208 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

209 

210 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id): 

211 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_UPDATE_PERMISSION_DENIED) 

212 

213 current_version = page.versions[-1] 

214 

215 page_version = PageVersion( 

216 page=page, 

217 editor_user_id=context.user_id, 

218 title=current_version.title, 

219 content=current_version.content, 

220 photo_key=current_version.photo_key, 

221 address=current_version.address, 

222 geom=current_version.geom, 

223 ) 

224 

225 if request.HasField("title"): 

226 if not request.title.value: 

227 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE) 

228 page_version.title = request.title.value 

229 

230 if request.HasField("content"): 

231 if not request.content.value: 

232 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT) 

233 page_version.content = request.content.value 

234 

235 if request.HasField("photo_key"): 

236 if not request.photo_key.value: 

237 page_version.photo_key = None 

238 else: 

239 if not session.execute( 

240 select(Upload).where(Upload.key == request.photo_key.value) 

241 ).scalar_one_or_none(): 

242 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

243 page_version.photo_key = request.photo_key.value 

244 

245 if request.HasField("address"): 

246 if not request.address.value: 

247 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS) 

248 page_version.address = request.address.value 

249 

250 if request.HasField("location"): 

251 page_version.geom = create_coordinate(request.location.lat, request.location.lng) 

252 

253 session.add(page_version) 

254 session.commit() 

255 return page_to_pb(page, context) 

256 

257 def TransferPage(self, request, context): 

258 with session_scope() as session: 

259 page = session.execute( 

260 select(Page).where(Page.id == request.page_id).where(Page.type != PageType.main_page) 

261 ).scalar_one_or_none() 

262 

263 if not page: 

264 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND) 

265 

266 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id): 

267 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_TRANSFER_PERMISSION_DENIED) 

268 

269 if request.WhichOneof("new_owner") == "new_owner_group_id": 

270 cluster = session.execute( 

271 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

272 ).scalar_one_or_none() 

273 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

274 cluster = session.execute( 

275 select(Cluster) 

276 .where(Cluster.parent_node_id == request.new_owner_community_id) 

277 .where(Cluster.is_official_cluster) 

278 ).scalar_one_or_none() 

279 else: 

280 # i'm not sure if this needs to be checked 

281 context.abort(grpc.StatusCode.UNKNOWN, errors.UNKNOWN_ERROR) 

282 

283 if not cluster: 

284 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

285 

286 page.owner_user = None 

287 page.owner_cluster = cluster 

288 

289 session.commit() 

290 return page_to_pb(page, context) 

291 

292 def ListUserPlaces(self, request, context): 

293 with session_scope() as session: 

294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

295 next_page_id = int(request.page_token) if request.page_token else 0 

296 user_id = request.user_id or context.user_id 

297 places = ( 

298 session.execute( 

299 select(Page) 

300 .where(Page.owner_user_id == user_id) 

301 .where(Page.type == PageType.place) 

302 .where(Page.id >= next_page_id) 

303 .order_by(Page.id) 

304 .limit(page_size + 1) 

305 ) 

306 .scalars() 

307 .all() 

308 ) 

309 return pages_pb2.ListUserPlacesRes( 

310 places=[page_to_pb(page, context) for page in places[:page_size]], 

311 next_page_token=str(places[-1].id) if len(places) > page_size else None, 

312 ) 

313 

314 def ListUserGuides(self, request, context): 

315 with session_scope() as session: 

316 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

317 next_page_id = int(request.page_token) if request.page_token else 0 

318 user_id = request.user_id or context.user_id 

319 guides = ( 

320 session.execute( 

321 select(Page) 

322 .where(Page.owner_user_id == user_id) 

323 .where(Page.type == PageType.guide) 

324 .where(Page.id >= next_page_id) 

325 .order_by(Page.id) 

326 .limit(page_size + 1) 

327 ) 

328 .scalars() 

329 .all() 

330 ) 

331 return pages_pb2.ListUserGuidesRes( 

332 guides=[page_to_pb(page, context) for page in guides[:page_size]], 

333 next_page_token=str(guides[-1].id) if len(guides) > page_size else None, 

334 )