Coverage for app/backend/src/couchers/servicers/editor.py: 83%
173 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import json
2import logging
4import grpc
5from geoalchemy2.shape import from_shape
6from google.protobuf import empty_pb2
7from shapely.geometry import shape
8from shapely.geometry.base import BaseGeometry
9from sqlalchemy import select
10from sqlalchemy.orm import Session
11from sqlalchemy.sql import exists, update
13from couchers import urls
14from couchers.context import CouchersContext
15from couchers.db import session_scope
16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node
17from couchers.jobs.enqueue import queue_job
18from couchers.materialized_views import LiteUser
19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer
20from couchers.models.notifications import NotificationTopicAction
21from couchers.models.postal_verification import PostalVerificationAttempt
22from couchers.notifications.notify import notify
23from couchers.postal.my_postcard import download_pdf
24from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2, postal_verification_pb2
25from couchers.proto.internal import jobs_pb2
26from couchers.resources import get_static_badge_dict
27from couchers.servicers.communities import community_to_pb
28from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event
29from couchers.servicers.postal_verification import postalverificationstatus2pb
30from couchers.servicers.public import format_volunteer_link
31from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date
33logger = logging.getLogger(__name__)
35MAX_PAGINATION_LENGTH = 250
38def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry:
39 geom = shape(json.loads(geojson))
41 if geom.geom_type != "MultiPolygon":
42 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:no_multipolygon")
44 return geom
47def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer:
48 """Convert a Volunteer model to the editor protobuf message."""
49 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one()
50 board_members = set(get_static_badge_dict()["board_member"])
52 return editor_pb2.Volunteer(
53 user_id=volunteer.user_id,
54 name=volunteer.display_name or lite_user.name,
55 username=lite_user.username,
56 is_board_member=lite_user.id in board_members,
57 role=volunteer.role,
58 location=volunteer.display_location or lite_user.city,
59 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None,
60 sort_key=volunteer.sort_key,
61 started_volunteering=date_to_api(volunteer.started_volunteering),
62 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None,
63 show_on_team_page=volunteer.show_on_team_page,
64 **format_volunteer_link(volunteer, lite_user.username),
65 )
68def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None:
69 with session_scope() as session:
70 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all()
71 for user_id in all_users_ids:
72 notify(
73 session,
74 user_id=user_id,
75 topic_action=NotificationTopicAction.general__new_blog_post,
76 key=payload.url,
77 data=notification_data_pb2.GeneralNewBlogPost(
78 url=payload.url,
79 title=payload.title,
80 blurb=payload.blurb,
81 ),
82 )
85class Editor(editor_pb2_grpc.EditorServicer):
86 def CreateCommunity(
87 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session
88 ) -> communities_pb2.Community:
89 geom = load_community_geom(request.geojson, context)
91 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
92 if parent_node_id is not None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none()
94 if not parent_node:
95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:parent_node_not_found")
96 parent_type = parent_node.node_type
97 else:
98 parent_type = None
99 node_type = CHILD_NODE_TYPE[parent_type]
100 node = create_node(session, geom, parent_node_id, node_type)
101 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
103 return community_to_pb(session, node, context)
105 def UpdateCommunity(
106 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session
107 ) -> communities_pb2.Community:
108 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
109 if not node:
110 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
111 cluster = node.official_cluster
113 if request.name: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true
114 cluster.name = request.name
116 if request.description: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true
117 cluster.description = request.description
119 if request.geojson: 119 ↛ 124line 119 didn't jump to line 124 because the condition on line 119 was always true
120 geom = load_community_geom(request.geojson, context)
122 node.geom = from_shape(geom)
124 if request.parent_node_id != 0:
125 node.parent_node_id = request.parent_node_id
127 session.flush()
129 return community_to_pb(session, cluster.parent_node, context)
131 def ListEventCommunityInviteRequests(
132 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session
133 ) -> editor_pb2.ListEventCommunityInviteRequestsRes:
134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
135 next_request_id = int(request.page_token) if request.page_token else 0
136 requests = (
137 session.execute(
138 select(EventCommunityInviteRequest)
139 .where(EventCommunityInviteRequest.approved.is_(None))
140 .where(EventCommunityInviteRequest.id >= next_request_id)
141 .order_by(EventCommunityInviteRequest.id)
142 .limit(page_size + 1)
143 )
144 .scalars()
145 .all()
146 )
148 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest:
149 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
150 return editor_pb2.EventCommunityInviteRequest(
151 event_community_invite_request_id=request.id,
152 user_id=request.user_id,
153 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
154 approx_users_to_notify=len(users_to_notify),
155 community_id=node_id,
156 )
158 return editor_pb2.ListEventCommunityInviteRequestsRes(
159 requests=[_request_to_pb(request) for request in requests[:page_size]],
160 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
161 )
163 def DecideEventCommunityInviteRequest(
164 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session
165 ) -> editor_pb2.DecideEventCommunityInviteRequestRes:
166 req = session.execute(
167 select(EventCommunityInviteRequest).where(
168 EventCommunityInviteRequest.id == request.event_community_invite_request_id
169 )
170 ).scalar_one_or_none()
172 if not req: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:event_community_invite_not_found")
175 if req.decided: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 context.abort_with_error_code(
177 grpc.StatusCode.FAILED_PRECONDITION, "admin:event_community_invite_already_decided"
178 )
180 decided = now()
181 req.decided = decided
182 req.decided_by_user_id = context.user_id
183 req.approved = request.approve
185 # deny other reqs for the same event
186 if request.approve:
187 session.execute(
188 update(EventCommunityInviteRequest)
189 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
190 .where(EventCommunityInviteRequest.decided.is_(None))
191 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
192 )
194 session.flush()
196 if request.approve:
197 queue_job(
198 session,
199 job=generate_event_create_notifications,
200 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
201 inviting_user_id=req.user_id,
202 occurrence_id=req.occurrence_id,
203 approved=True,
204 ),
205 )
207 return editor_pb2.DecideEventCommunityInviteRequestRes()
209 def SendBlogPostNotification(
210 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session
211 ) -> empty_pb2.Empty:
212 if len(request.title) > 50: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:blog_title_too_long")
214 if len(request.blurb) > 100: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:blog_blurb_too_long")
216 queue_job(
217 session,
218 job=generate_new_blog_post_notifications,
219 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
220 url=request.url,
221 title=request.title,
222 blurb=request.blurb,
223 ),
224 )
225 return empty_pb2.Empty()
227 def MakeUserVolunteer(
228 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session
229 ) -> editor_pb2.Volunteer:
230 # Check if user exists
231 if not session.execute(select(exists().where(User.id == request.user_id))).scalar():
232 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
234 # Check if user is already a volunteer
235 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar():
236 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_volunteer")
238 # Parse started_volunteering date
239 started_volunteering = None
240 if request.started_volunteering:
241 started_volunteering = parse_date(request.started_volunteering)
242 if not started_volunteering:
243 context.abort_with_error_code(
244 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_started_volunteering_date"
245 )
247 # Create a volunteer record
248 volunteer = Volunteer(
249 user_id=request.user_id,
250 role=request.role,
251 show_on_team_page=not request.hide_on_team_page,
252 )
253 if started_volunteering:
254 volunteer.started_volunteering = started_volunteering
255 session.add(volunteer)
256 session.flush()
258 return volunteer_to_pb(session, volunteer)
260 def UpdateVolunteer(
261 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session
262 ) -> editor_pb2.Volunteer:
263 # Check if volunteer exists
264 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none()
265 if not volunteer:
266 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:volunteer_not_found")
268 # Update role if provided
269 if request.HasField("role"):
270 volunteer.role = request.role.value
272 # Update sort_key if provided
273 if request.HasField("sort_key"):
274 volunteer.sort_key = request.sort_key.value
276 # Update started_volunteering if provided
277 if request.HasField("started_volunteering"):
278 started_volunteering = parse_date(request.started_volunteering.value)
279 if not started_volunteering:
280 context.abort_with_error_code(
281 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_started_volunteering_date"
282 )
283 volunteer.started_volunteering = started_volunteering
285 # Reinstate (clear stopped_volunteering) or update stopped_volunteering
286 if request.reinstate_volunteer and request.HasField("stopped_volunteering"):
287 context.abort_with_error_code(
288 grpc.StatusCode.INVALID_ARGUMENT, "admin:cannot_reinstate_and_set_stopped_date"
289 )
290 if request.reinstate_volunteer:
291 volunteer.stopped_volunteering = None
292 elif request.HasField("stopped_volunteering"):
293 stopped_volunteering = parse_date(request.stopped_volunteering.value)
294 if not stopped_volunteering:
295 context.abort_with_error_code(
296 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_stopped_volunteering_date"
297 )
298 volunteer.stopped_volunteering = stopped_volunteering
300 # Update show_on_team_page if provided
301 if request.HasField("show_on_team_page"):
302 volunteer.show_on_team_page = request.show_on_team_page.value
304 session.flush()
306 return volunteer_to_pb(session, volunteer)
308 def ListVolunteers(
309 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session
310 ) -> editor_pb2.ListVolunteersRes:
311 # Query volunteers
312 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible)
314 # Filter based on include_past flag
315 if not request.include_past:
316 query = query.where(Volunteer.stopped_volunteering.is_(None))
318 # Order by same criteria as public API
319 query = query.order_by(
320 Volunteer.sort_key.asc().nulls_last(),
321 Volunteer.stopped_volunteering.desc().nulls_first(),
322 Volunteer.started_volunteering.asc(),
323 )
325 volunteers = session.execute(query).scalars().all()
327 return editor_pb2.ListVolunteersRes(
328 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers]
329 )
331 def ListPostcards(
332 self, request: editor_pb2.ListPostcardsReq, context: CouchersContext, session: Session
333 ) -> editor_pb2.ListPostcardsRes:
334 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
335 next_id = int(request.page_token) if request.page_token else None
337 query = (
338 select(PostalVerificationAttempt, User)
339 .join(User, User.id == PostalVerificationAttempt.user_id)
340 .order_by(PostalVerificationAttempt.id.desc())
341 .limit(page_size + 1)
342 )
343 if next_id is not None:
344 query = query.where(PostalVerificationAttempt.id <= next_id)
346 results = session.execute(query).all()
348 def _attempt_to_pb(attempt: PostalVerificationAttempt, user: User) -> editor_pb2.PostcardInfo:
349 return editor_pb2.PostcardInfo(
350 postal_verification_attempt_id=attempt.id,
351 user_id=attempt.user_id,
352 username=user.username,
353 name=user.name,
354 status=postalverificationstatus2pb.get(
355 attempt.status, postal_verification_pb2.POSTAL_VERIFICATION_STATUS_UNKNOWN
356 ),
357 address=postal_verification_pb2.PostalAddress(
358 address_line_1=attempt.address_line_1,
359 address_line_2=attempt.address_line_2,
360 city=attempt.city,
361 state=attempt.state,
362 postal_code=attempt.postal_code,
363 country_code=attempt.country_code,
364 ),
365 created=Timestamp_from_datetime(attempt.created),
366 postcard_sent_at=Timestamp_from_datetime(attempt.postcard_sent_at)
367 if attempt.postcard_sent_at
368 else None,
369 verified_at=Timestamp_from_datetime(attempt.verified_at) if attempt.verified_at else None,
370 )
372 return editor_pb2.ListPostcardsRes(
373 postcards=[_attempt_to_pb(attempt, user) for attempt, user in results[:page_size]],
374 next_page_token=str(results[-1][0].id) if len(results) > page_size else None,
375 )
377 def DownloadPostcardPdf(
378 self, request: editor_pb2.DownloadPostcardPdfReq, context: CouchersContext, session: Session
379 ) -> editor_pb2.DownloadPostcardPdfRes:
380 attempt = session.execute(
381 select(PostalVerificationAttempt).where(
382 PostalVerificationAttempt.id == request.postal_verification_attempt_id
383 )
384 ).scalar_one_or_none()
386 if not attempt:
387 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "postal_verification_attempt_not_found")
389 if not attempt.mypostcard_job_id:
390 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:postcard_not_sent")
392 pdf_data = download_pdf(attempt.mypostcard_job_id)
393 return editor_pb2.DownloadPostcardPdfRes(pdf=pdf_data)