Coverage for src/couchers/servicers/pages.py: 95%
148 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-11-21 04:21 +0000
1import grpc
3from couchers import errors
4from couchers.db import can_moderate_at, can_moderate_node, get_parent_node_at_location
5from couchers.models import Cluster, Node, Page, PageType, PageVersion, Thread, Upload, User
6from couchers.servicers.threads import thread_to_pb
7from couchers.sql import couchers_select as select
8from couchers.utils import Timestamp_from_datetime, create_coordinate, remove_duplicates_retain_order
9from proto import pages_pb2, pages_pb2_grpc
11MAX_PAGINATION_LENGTH = 25
13pagetype2sql = {
14 pages_pb2.PAGE_TYPE_PLACE: PageType.place,
15 pages_pb2.PAGE_TYPE_GUIDE: PageType.guide,
16 pages_pb2.PAGE_TYPE_MAIN_PAGE: PageType.main_page,
17}
19pagetype2api = {
20 PageType.place: pages_pb2.PAGE_TYPE_PLACE,
21 PageType.guide: pages_pb2.PAGE_TYPE_GUIDE,
22 PageType.main_page: pages_pb2.PAGE_TYPE_MAIN_PAGE,
23}
26def _is_page_owner(page: Page, user_id):
27 """
28 Checks whether the user can act as an owner of the page
29 """
30 if page.owner_user:
31 return page.owner_user_id == user_id
32 # otherwise owned by a cluster
33 return page.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
36def _can_moderate_page(session, page: Page, user_id):
37 """
38 Checks if the user is allowed to moderate this page
39 """
40 # checks if either the page is in the exclusive moderation area of a node
41 latest_version = page.versions[-1]
43 # if the page has a location, we firstly check if we are the moderator of any node that contains this page
44 if latest_version.geom is not None and can_moderate_at(session, user_id, latest_version.geom):
45 return True
47 # if the page is owned by a cluster, then any moderator of that cluster can moderate this page
48 if page.owner_cluster is not None and can_moderate_node(session, user_id, page.owner_cluster.parent_node_id):
49 return True
51 # finally check if the user can moderate the parent node of the cluster
52 return can_moderate_node(session, user_id, page.parent_node_id)
55def page_to_pb(session, page: Page, context):
56 first_version = page.versions[0]
57 current_version = page.versions[-1]
59 owner_community_id = None
60 owner_group_id = None
61 if page.owner_cluster:
62 if page.owner_cluster.is_official_cluster:
63 owner_community_id = page.owner_cluster.parent_node_id
64 else:
65 owner_group_id = page.owner_cluster.id
67 can_moderate = _can_moderate_page(session, page, context.user_id)
69 return pages_pb2.Page(
70 page_id=page.id,
71 type=pagetype2api[page.type],
72 slug=current_version.slug,
73 created=Timestamp_from_datetime(first_version.created),
74 last_edited=Timestamp_from_datetime(current_version.created),
75 last_editor_user_id=current_version.editor_user_id,
76 creator_user_id=page.creator_user_id,
77 owner_user_id=page.owner_user_id,
78 owner_community_id=owner_community_id,
79 owner_group_id=owner_group_id,
80 thread=thread_to_pb(session, page.thread_id),
81 title=current_version.title,
82 content=current_version.content,
83 photo_url=current_version.photo.full_url if current_version.photo_key else None,
84 address=current_version.address,
85 location=(
86 pages_pb2.Coordinate(
87 lat=current_version.coordinates[0],
88 lng=current_version.coordinates[1],
89 )
90 if current_version.coordinates
91 else None
92 ),
93 editor_user_ids=remove_duplicates_retain_order([version.editor_user_id for version in page.versions]),
94 can_edit=_is_page_owner(page, context.user_id) or can_moderate,
95 can_moderate=can_moderate,
96 )
99class Pages(pages_pb2_grpc.PagesServicer):
100 def CreatePlace(self, request, context, session):
101 if not request.title:
102 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
103 if not request.content:
104 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
105 if not request.address:
106 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS)
107 if not request.HasField("location"):
108 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_LOCATION)
109 if request.location.lat == 0 and request.location.lng == 0:
110 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
112 geom = create_coordinate(request.location.lat, request.location.lng)
114 if (
115 request.photo_key
116 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
117 ):
118 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
120 page = Page(
121 parent_node=get_parent_node_at_location(session, geom),
122 type=PageType.place,
123 creator_user_id=context.user_id,
124 owner_user_id=context.user_id,
125 thread=Thread(),
126 )
127 session.add(page)
128 session.flush()
129 page_version = PageVersion(
130 page=page,
131 editor_user_id=context.user_id,
132 title=request.title,
133 content=request.content,
134 photo_key=request.photo_key if request.photo_key else None,
135 address=request.address,
136 geom=geom,
137 )
138 session.add(page_version)
139 session.commit()
140 return page_to_pb(session, page, context)
142 def CreateGuide(self, request, context, session):
143 if not request.title:
144 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
145 if not request.content:
146 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
147 if request.address and request.HasField("location"):
148 address = request.address
149 if request.location.lat == 0 and request.location.lng == 0:
150 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
151 geom = create_coordinate(request.location.lat, request.location.lng)
152 elif not request.address and not request.HasField("location"):
153 address = None
154 geom = None
155 else:
156 # you have to have both or neither
157 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_GUIDE_LOCATION)
159 if not request.parent_community_id:
160 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_PARENT)
162 parent_node = session.execute(select(Node).where(Node.id == request.parent_community_id)).scalar_one_or_none()
164 if not parent_node:
165 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
167 if (
168 request.photo_key
169 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
170 ):
171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
173 page = Page(
174 parent_node=parent_node,
175 type=PageType.guide,
176 creator_user_id=context.user_id,
177 owner_user_id=context.user_id,
178 thread=Thread(),
179 )
180 session.add(page)
181 session.flush()
182 page_version = PageVersion(
183 page=page,
184 editor_user_id=context.user_id,
185 title=request.title,
186 content=request.content,
187 photo_key=request.photo_key if request.photo_key else None,
188 address=address,
189 geom=geom,
190 )
191 session.add(page_version)
192 session.commit()
193 return page_to_pb(session, page, context)
195 def GetPage(self, request, context, session):
196 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none()
197 if not page:
198 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
200 return page_to_pb(session, page, context)
202 def UpdatePage(self, request, context, session):
203 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none()
204 if not page:
205 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
207 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(session, page, context.user_id):
208 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_UPDATE_PERMISSION_DENIED)
210 current_version = page.versions[-1]
212 page_version = PageVersion(
213 page=page,
214 editor_user_id=context.user_id,
215 title=current_version.title,
216 content=current_version.content,
217 photo_key=current_version.photo_key,
218 address=current_version.address,
219 geom=current_version.geom,
220 )
222 if request.HasField("title"):
223 if not request.title.value:
224 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
225 page_version.title = request.title.value
227 if request.HasField("content"):
228 if not request.content.value:
229 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
230 page_version.content = request.content.value
232 if request.HasField("photo_key"):
233 if not request.photo_key.value:
234 page_version.photo_key = None
235 else:
236 if not session.execute(
237 select(Upload).where(Upload.key == request.photo_key.value)
238 ).scalar_one_or_none():
239 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
240 page_version.photo_key = request.photo_key.value
242 if request.HasField("address"):
243 if not request.address.value:
244 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS)
245 page_version.address = request.address.value
247 if request.HasField("location"):
248 page_version.geom = create_coordinate(request.location.lat, request.location.lng)
250 session.add(page_version)
251 session.commit()
252 return page_to_pb(session, page, context)
254 def TransferPage(self, request, context, session):
255 page = session.execute(
256 select(Page).where(Page.id == request.page_id).where(Page.type != PageType.main_page)
257 ).scalar_one_or_none()
259 if not page:
260 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
262 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(session, page, context.user_id):
263 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_TRANSFER_PERMISSION_DENIED)
265 if request.WhichOneof("new_owner") == "new_owner_group_id":
266 cluster = session.execute(
267 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
268 ).scalar_one_or_none()
269 elif request.WhichOneof("new_owner") == "new_owner_community_id":
270 cluster = session.execute(
271 select(Cluster)
272 .where(Cluster.parent_node_id == request.new_owner_community_id)
273 .where(Cluster.is_official_cluster)
274 ).scalar_one_or_none()
275 else:
276 # i'm not sure if this needs to be checked
277 context.abort(grpc.StatusCode.UNKNOWN, errors.UNKNOWN_ERROR)
279 if not cluster:
280 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
282 page.owner_user = None
283 page.owner_cluster = cluster
285 session.commit()
286 return page_to_pb(session, page, context)
288 def ListUserPlaces(self, request, context, session):
289 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
290 next_page_id = int(request.page_token) if request.page_token else 0
291 user_id = request.user_id or context.user_id
292 places = (
293 session.execute(
294 select(Page)
295 .where(Page.owner_user_id == user_id)
296 .where(Page.type == PageType.place)
297 .where(Page.id >= next_page_id)
298 .order_by(Page.id)
299 .limit(page_size + 1)
300 )
301 .scalars()
302 .all()
303 )
304 return pages_pb2.ListUserPlacesRes(
305 places=[page_to_pb(session, page, context) for page in places[:page_size]],
306 next_page_token=str(places[-1].id) if len(places) > page_size else None,
307 )
309 def ListUserGuides(self, request, context, session):
310 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
311 next_page_id = int(request.page_token) if request.page_token else 0
312 user_id = request.user_id or context.user_id
313 guides = (
314 session.execute(
315 select(Page)
316 .where(Page.owner_user_id == user_id)
317 .where(Page.type == PageType.guide)
318 .where(Page.id >= next_page_id)
319 .order_by(Page.id)
320 .limit(page_size + 1)
321 )
322 .scalars()
323 .all()
324 )
325 return pages_pb2.ListUserGuidesRes(
326 guides=[page_to_pb(session, page, context) for page in guides[:page_size]],
327 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
328 )