Coverage for src/couchers/servicers/pages.py: 95%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import grpc
3from couchers import errors
4from couchers.db import can_moderate_at, can_moderate_node, get_parent_node_at_location, session_scope
5from couchers.models import Cluster, Node, Page, PageType, PageVersion, Thread, Upload, User
6from couchers.servicers.threads import thread_to_pb
7from couchers.sql import couchers_select as select
8from couchers.utils import Timestamp_from_datetime, create_coordinate, remove_duplicates_retain_order
9from proto import pages_pb2, pages_pb2_grpc
11MAX_PAGINATION_LENGTH = 25
13pagetype2sql = {
14 pages_pb2.PAGE_TYPE_PLACE: PageType.place,
15 pages_pb2.PAGE_TYPE_GUIDE: PageType.guide,
16 pages_pb2.PAGE_TYPE_MAIN_PAGE: PageType.main_page,
17}
19pagetype2api = {
20 PageType.place: pages_pb2.PAGE_TYPE_PLACE,
21 PageType.guide: pages_pb2.PAGE_TYPE_GUIDE,
22 PageType.main_page: pages_pb2.PAGE_TYPE_MAIN_PAGE,
23}
26def _is_page_owner(page: Page, user_id):
27 """
28 Checks whether the user can act as an owner of the page
29 """
30 if page.owner_user:
31 return page.owner_user_id == user_id
32 # otherwise owned by a cluster
33 return page.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
36def _can_moderate_page(page: Page, user_id):
37 """
38 Checks if the user is allowed to moderate this page
39 """
40 # checks if either the page is in the exclusive moderation area of a node
41 with session_scope() as session:
42 latest_version = page.versions[-1]
44 # if the page has a location, we firstly check if we are the moderator of any node that contains this page
45 if latest_version.geom is not None and can_moderate_at(session, user_id, latest_version.geom):
46 return True
48 # if the page is owned by a cluster, then any moderator of that cluster can moderate this page
49 if page.owner_cluster is not None and can_moderate_node(session, user_id, page.owner_cluster.parent_node_id):
50 return True
52 # finally check if the user can moderate the parent node of the cluster
53 return can_moderate_node(session, user_id, page.parent_node_id)
56def page_to_pb(page: Page, context):
57 first_version = page.versions[0]
58 current_version = page.versions[-1]
60 owner_community_id = None
61 owner_group_id = None
62 if page.owner_cluster:
63 if page.owner_cluster.is_official_cluster:
64 owner_community_id = page.owner_cluster.parent_node_id
65 else:
66 owner_group_id = page.owner_cluster.id
68 return pages_pb2.Page(
69 page_id=page.id,
70 type=pagetype2api[page.type],
71 slug=current_version.slug,
72 created=Timestamp_from_datetime(first_version.created),
73 last_edited=Timestamp_from_datetime(current_version.created),
74 last_editor_user_id=current_version.editor_user_id,
75 creator_user_id=page.creator_user_id,
76 owner_user_id=page.owner_user_id,
77 owner_community_id=owner_community_id,
78 owner_group_id=owner_group_id,
79 thread=thread_to_pb(page.thread_id),
80 title=current_version.title,
81 content=current_version.content,
82 photo_url=current_version.photo.full_url if current_version.photo_key else None,
83 address=current_version.address,
84 location=pages_pb2.Coordinate(
85 lat=current_version.coordinates[0],
86 lng=current_version.coordinates[1],
87 )
88 if current_version.coordinates
89 else None,
90 editor_user_ids=remove_duplicates_retain_order([version.editor_user_id for version in page.versions]),
91 can_edit=_is_page_owner(page, context.user_id),
92 can_moderate=_can_moderate_page(page, context.user_id),
93 )
96class Pages(pages_pb2_grpc.PagesServicer):
97 def CreatePlace(self, request, context):
98 if not request.title:
99 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
100 if not request.content:
101 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
102 if not request.address:
103 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS)
104 if not request.HasField("location"):
105 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_LOCATION)
106 if request.location.lat == 0 and request.location.lng == 0:
107 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
109 geom = create_coordinate(request.location.lat, request.location.lng)
111 with session_scope() as session:
112 if (
113 request.photo_key
114 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
115 ):
116 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
118 page = Page(
119 parent_node=get_parent_node_at_location(session, geom),
120 type=PageType.place,
121 creator_user_id=context.user_id,
122 owner_user_id=context.user_id,
123 thread=Thread(),
124 )
125 session.add(page)
126 session.flush()
127 page_version = PageVersion(
128 page=page,
129 editor_user_id=context.user_id,
130 title=request.title,
131 content=request.content,
132 photo_key=request.photo_key if request.photo_key else None,
133 address=request.address,
134 geom=geom,
135 )
136 session.add(page_version)
137 session.commit()
138 return page_to_pb(page, context)
140 def CreateGuide(self, request, context):
141 if not request.title:
142 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
143 if not request.content:
144 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
145 if request.address and request.HasField("location"):
146 address = request.address
147 if request.location.lat == 0 and request.location.lng == 0:
148 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
149 geom = create_coordinate(request.location.lat, request.location.lng)
150 elif not request.address and not request.HasField("location"):
151 address = None
152 geom = None
153 else:
154 # you have to have both or neither
155 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_GUIDE_LOCATION)
157 if not request.parent_community_id:
158 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_PARENT)
160 with session_scope() as session:
161 parent_node = session.execute(
162 select(Node).where(Node.id == request.parent_community_id)
163 ).scalar_one_or_none()
165 if not parent_node:
166 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
168 if (
169 request.photo_key
170 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
171 ):
172 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
174 page = Page(
175 parent_node=parent_node,
176 type=PageType.guide,
177 creator_user_id=context.user_id,
178 owner_user_id=context.user_id,
179 thread=Thread(),
180 )
181 session.add(page)
182 session.flush()
183 page_version = PageVersion(
184 page=page,
185 editor_user_id=context.user_id,
186 title=request.title,
187 content=request.content,
188 photo_key=request.photo_key if request.photo_key else None,
189 address=address,
190 geom=geom,
191 )
192 session.add(page_version)
193 session.commit()
194 return page_to_pb(page, context)
196 def GetPage(self, request, context):
197 with session_scope() as session:
198 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none()
199 if not page:
200 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
202 return page_to_pb(page, context)
204 def UpdatePage(self, request, context):
205 with session_scope() as session:
206 page = session.execute(select(Page).where(Page.id == request.page_id)).scalar_one_or_none()
207 if not page:
208 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
210 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id):
211 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_UPDATE_PERMISSION_DENIED)
213 current_version = page.versions[-1]
215 page_version = PageVersion(
216 page=page,
217 editor_user_id=context.user_id,
218 title=current_version.title,
219 content=current_version.content,
220 photo_key=current_version.photo_key,
221 address=current_version.address,
222 geom=current_version.geom,
223 )
225 if request.HasField("title"):
226 if not request.title.value:
227 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_TITLE)
228 page_version.title = request.title.value
230 if request.HasField("content"):
231 if not request.content.value:
232 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_CONTENT)
233 page_version.content = request.content.value
235 if request.HasField("photo_key"):
236 if not request.photo_key.value:
237 page_version.photo_key = None
238 else:
239 if not session.execute(
240 select(Upload).where(Upload.key == request.photo_key.value)
241 ).scalar_one_or_none():
242 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
243 page_version.photo_key = request.photo_key.value
245 if request.HasField("address"):
246 if not request.address.value:
247 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_PAGE_ADDRESS)
248 page_version.address = request.address.value
250 if request.HasField("location"):
251 page_version.geom = create_coordinate(request.location.lat, request.location.lng)
253 session.add(page_version)
254 session.commit()
255 return page_to_pb(page, context)
257 def TransferPage(self, request, context):
258 with session_scope() as session:
259 page = session.execute(
260 select(Page).where(Page.id == request.page_id).where(Page.type != PageType.main_page)
261 ).scalar_one_or_none()
263 if not page:
264 context.abort(grpc.StatusCode.NOT_FOUND, errors.PAGE_NOT_FOUND)
266 if not _is_page_owner(page, context.user_id) and not _can_moderate_page(page, context.user_id):
267 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.PAGE_TRANSFER_PERMISSION_DENIED)
269 if request.WhichOneof("new_owner") == "new_owner_group_id":
270 cluster = session.execute(
271 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
272 ).scalar_one_or_none()
273 elif request.WhichOneof("new_owner") == "new_owner_community_id":
274 cluster = session.execute(
275 select(Cluster)
276 .where(Cluster.parent_node_id == request.new_owner_community_id)
277 .where(Cluster.is_official_cluster)
278 ).scalar_one_or_none()
279 else:
280 # i'm not sure if this needs to be checked
281 context.abort(grpc.StatusCode.UNKNOWN, errors.UNKNOWN_ERROR)
283 if not cluster:
284 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
286 page.owner_user = None
287 page.owner_cluster = cluster
289 session.commit()
290 return page_to_pb(page, context)
292 def ListUserPlaces(self, request, context):
293 with session_scope() as session:
294 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
295 next_page_id = int(request.page_token) if request.page_token else 0
296 user_id = request.user_id or context.user_id
297 places = (
298 session.execute(
299 select(Page)
300 .where(Page.owner_user_id == user_id)
301 .where(Page.type == PageType.place)
302 .where(Page.id >= next_page_id)
303 .order_by(Page.id)
304 .limit(page_size + 1)
305 )
306 .scalars()
307 .all()
308 )
309 return pages_pb2.ListUserPlacesRes(
310 places=[page_to_pb(page, context) for page in places[:page_size]],
311 next_page_token=str(places[-1].id) if len(places) > page_size else None,
312 )
314 def ListUserGuides(self, request, context):
315 with session_scope() as session:
316 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
317 next_page_id = int(request.page_token) if request.page_token else 0
318 user_id = request.user_id or context.user_id
319 guides = (
320 session.execute(
321 select(Page)
322 .where(Page.owner_user_id == user_id)
323 .where(Page.type == PageType.guide)
324 .where(Page.id >= next_page_id)
325 .order_by(Page.id)
326 .limit(page_size + 1)
327 )
328 .scalars()
329 .all()
330 )
331 return pages_pb2.ListUserGuidesRes(
332 guides=[page_to_pb(page, context) for page in guides[:page_size]],
333 next_page_token=str(guides[-1].id) if len(guides) > page_size else None,
334 )