Coverage for app / backend / src / couchers / servicers / editor.py: 83%
173 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import json
2import logging
4import grpc
5from geoalchemy2.shape import from_shape
6from google.protobuf import empty_pb2
7from shapely.geometry import shape
8from shapely.geometry.base import BaseGeometry
9from sqlalchemy import select
10from sqlalchemy.orm import Session
11from sqlalchemy.sql import exists, update
13from couchers import urls
14from couchers.context import CouchersContext
15from couchers.db import session_scope
16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node
17from couchers.jobs.enqueue import queue_job
18from couchers.materialized_views import LiteUser
19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer
20from couchers.models.notifications import NotificationTopicAction
21from couchers.models.postal_verification import PostalVerificationAttempt
22from couchers.notifications.notify import notify
23from couchers.postal.my_postcard import download_pdf
24from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2, postal_verification_pb2
25from couchers.proto.internal import jobs_pb2
26from couchers.resources import get_static_badge_dict
27from couchers.servicers.communities import community_to_pb
28from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event
29from couchers.servicers.postal_verification import postalverificationstatus2pb
30from couchers.servicers.public import format_volunteer_link
31from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date
33logger = logging.getLogger(__name__)
35MAX_PAGINATION_LENGTH = 250
38def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry:
39 geom = shape(json.loads(geojson))
41 if geom.geom_type != "MultiPolygon":
42 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon")
44 return geom
47def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer:
48 """Convert a Volunteer model to the editor protobuf message."""
49 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one()
50 board_members = set(get_static_badge_dict()["board_member"])
52 return editor_pb2.Volunteer(
53 user_id=volunteer.user_id,
54 name=volunteer.display_name or lite_user.name,
55 username=lite_user.username,
56 is_board_member=lite_user.id in board_members,
57 role=volunteer.role,
58 location=volunteer.display_location or lite_user.city,
59 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None,
60 sort_key=volunteer.sort_key,
61 started_volunteering=date_to_api(volunteer.started_volunteering),
62 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None,
63 show_on_team_page=volunteer.show_on_team_page,
64 **format_volunteer_link(volunteer, lite_user.username),
65 )
68def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None:
69 with session_scope() as session:
70 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all()
71 for user_id in all_users_ids:
72 notify(
73 session,
74 user_id=user_id,
75 topic_action=NotificationTopicAction.general__new_blog_post,
76 key=payload.url,
77 data=notification_data_pb2.GeneralNewBlogPost(
78 url=payload.url,
79 title=payload.title,
80 blurb=payload.blurb,
81 ),
82 )
85class Editor(editor_pb2_grpc.EditorServicer):
86 def CreateCommunity(
87 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session
88 ) -> communities_pb2.Community:
89 geom = load_community_geom(request.geojson, context)
91 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
92 if parent_node_id is not None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none()
94 if not parent_node:
95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "parent_node_not_found")
96 parent_type = parent_node.node_type
97 else:
98 parent_type = None
99 node_type = CHILD_NODE_TYPE[parent_type]
100 node = create_node(session, geom, parent_node_id, node_type)
101 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
103 return community_to_pb(session, node, context)
105 def UpdateCommunity(
106 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session
107 ) -> communities_pb2.Community:
108 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
109 if not node:
110 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
111 cluster = node.official_cluster
113 if request.name: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true
114 cluster.name = request.name
116 if request.description: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true
117 cluster.description = request.description
119 if request.geojson: 119 ↛ 124line 119 didn't jump to line 124 because the condition on line 119 was always true
120 geom = load_community_geom(request.geojson, context)
122 node.geom = from_shape(geom)
124 if request.parent_node_id != 0:
125 node.parent_node_id = request.parent_node_id
127 session.flush()
129 return community_to_pb(session, cluster.parent_node, context)
131 def ListEventCommunityInviteRequests(
132 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session
133 ) -> editor_pb2.ListEventCommunityInviteRequestsRes:
134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
135 next_request_id = int(request.page_token) if request.page_token else 0
136 requests = (
137 session.execute(
138 select(EventCommunityInviteRequest)
139 .where(EventCommunityInviteRequest.approved.is_(None))
140 .where(EventCommunityInviteRequest.id >= next_request_id)
141 .order_by(EventCommunityInviteRequest.id)
142 .limit(page_size + 1)
143 )
144 .scalars()
145 .all()
146 )
148 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest:
149 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
150 return editor_pb2.EventCommunityInviteRequest(
151 event_community_invite_request_id=request.id,
152 user_id=request.user_id,
153 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
154 approx_users_to_notify=len(users_to_notify),
155 community_id=node_id,
156 )
158 return editor_pb2.ListEventCommunityInviteRequestsRes(
159 requests=[_request_to_pb(request) for request in requests[:page_size]],
160 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
161 )
163 def DecideEventCommunityInviteRequest(
164 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session
165 ) -> editor_pb2.DecideEventCommunityInviteRequestRes:
166 req = session.execute(
167 select(EventCommunityInviteRequest).where(
168 EventCommunityInviteRequest.id == request.event_community_invite_request_id
169 )
170 ).scalar_one_or_none()
172 if not req: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found")
175 if req.decided: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided")
178 decided = now()
179 req.decided = decided
180 req.decided_by_user_id = context.user_id
181 req.approved = request.approve
183 # deny other reqs for the same event
184 if request.approve:
185 session.execute(
186 update(EventCommunityInviteRequest)
187 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
188 .where(EventCommunityInviteRequest.decided.is_(None))
189 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
190 )
192 session.flush()
194 if request.approve:
195 queue_job(
196 session,
197 job=generate_event_create_notifications,
198 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
199 inviting_user_id=req.user_id,
200 occurrence_id=req.occurrence_id,
201 approved=True,
202 ),
203 )
205 return editor_pb2.DecideEventCommunityInviteRequestRes()
207 def SendBlogPostNotification(
208 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session
209 ) -> empty_pb2.Empty:
210 if len(request.title) > 50: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long")
212 if len(request.blurb) > 100: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true
213 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long")
214 queue_job(
215 session,
216 job=generate_new_blog_post_notifications,
217 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
218 url=request.url,
219 title=request.title,
220 blurb=request.blurb,
221 ),
222 )
223 return empty_pb2.Empty()
225 def MakeUserVolunteer(
226 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session
227 ) -> editor_pb2.Volunteer:
228 # Check if user exists
229 if not session.execute(select(exists().where(User.id == request.user_id))).scalar():
230 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
232 # Check if user is already a volunteer
233 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar():
234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer")
236 # Parse started_volunteering date
237 started_volunteering = None
238 if request.started_volunteering:
239 started_volunteering = parse_date(request.started_volunteering)
240 if not started_volunteering:
241 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
243 # Create a volunteer record
244 volunteer = Volunteer(
245 user_id=request.user_id,
246 role=request.role,
247 show_on_team_page=not request.hide_on_team_page,
248 )
249 if started_volunteering:
250 volunteer.started_volunteering = started_volunteering
251 session.add(volunteer)
252 session.flush()
254 return volunteer_to_pb(session, volunteer)
256 def UpdateVolunteer(
257 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session
258 ) -> editor_pb2.Volunteer:
259 # Check if volunteer exists
260 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none()
261 if not volunteer:
262 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found")
264 # Update role if provided
265 if request.HasField("role"):
266 volunteer.role = request.role.value
268 # Update sort_key if provided
269 if request.HasField("sort_key"):
270 volunteer.sort_key = request.sort_key.value
272 # Update started_volunteering if provided
273 if request.HasField("started_volunteering"):
274 started_volunteering = parse_date(request.started_volunteering.value)
275 if not started_volunteering:
276 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
277 volunteer.started_volunteering = started_volunteering
279 # Reinstate (clear stopped_volunteering) or update stopped_volunteering
280 if request.reinstate_volunteer and request.HasField("stopped_volunteering"):
281 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cannot_reinstate_and_set_stopped_date")
282 if request.reinstate_volunteer:
283 volunteer.stopped_volunteering = None
284 elif request.HasField("stopped_volunteering"):
285 stopped_volunteering = parse_date(request.stopped_volunteering.value)
286 if not stopped_volunteering:
287 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date")
288 volunteer.stopped_volunteering = stopped_volunteering
290 # Update show_on_team_page if provided
291 if request.HasField("show_on_team_page"):
292 volunteer.show_on_team_page = request.show_on_team_page.value
294 session.flush()
296 return volunteer_to_pb(session, volunteer)
298 def ListVolunteers(
299 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session
300 ) -> editor_pb2.ListVolunteersRes:
301 # Query volunteers
302 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible)
304 # Filter based on include_past flag
305 if not request.include_past:
306 query = query.where(Volunteer.stopped_volunteering.is_(None))
308 # Order by same criteria as public API
309 query = query.order_by(
310 Volunteer.sort_key.asc().nulls_last(),
311 Volunteer.stopped_volunteering.desc().nulls_first(),
312 Volunteer.started_volunteering.asc(),
313 )
315 volunteers = session.execute(query).scalars().all()
317 return editor_pb2.ListVolunteersRes(
318 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers]
319 )
321 def ListPostcards(
322 self, request: editor_pb2.ListPostcardsReq, context: CouchersContext, session: Session
323 ) -> editor_pb2.ListPostcardsRes:
324 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
325 next_id = int(request.page_token) if request.page_token else None
327 query = (
328 select(PostalVerificationAttempt, User)
329 .join(User, User.id == PostalVerificationAttempt.user_id)
330 .order_by(PostalVerificationAttempt.id.desc())
331 .limit(page_size + 1)
332 )
333 if next_id is not None:
334 query = query.where(PostalVerificationAttempt.id <= next_id)
336 results = session.execute(query).all()
338 def _attempt_to_pb(attempt: PostalVerificationAttempt, user: User) -> editor_pb2.PostcardInfo:
339 return editor_pb2.PostcardInfo(
340 postal_verification_attempt_id=attempt.id,
341 user_id=attempt.user_id,
342 username=user.username,
343 name=user.name,
344 status=postalverificationstatus2pb.get(
345 attempt.status, postal_verification_pb2.POSTAL_VERIFICATION_STATUS_UNKNOWN
346 ),
347 address=postal_verification_pb2.PostalAddress(
348 address_line_1=attempt.address_line_1,
349 address_line_2=attempt.address_line_2,
350 city=attempt.city,
351 state=attempt.state,
352 postal_code=attempt.postal_code,
353 country_code=attempt.country_code,
354 ),
355 created=Timestamp_from_datetime(attempt.created),
356 postcard_sent_at=Timestamp_from_datetime(attempt.postcard_sent_at)
357 if attempt.postcard_sent_at
358 else None,
359 verified_at=Timestamp_from_datetime(attempt.verified_at) if attempt.verified_at else None,
360 )
362 return editor_pb2.ListPostcardsRes(
363 postcards=[_attempt_to_pb(attempt, user) for attempt, user in results[:page_size]],
364 next_page_token=str(results[-1][0].id) if len(results) > page_size else None,
365 )
367 def DownloadPostcardPdf(
368 self, request: editor_pb2.DownloadPostcardPdfReq, context: CouchersContext, session: Session
369 ) -> editor_pb2.DownloadPostcardPdfRes:
370 attempt = session.execute(
371 select(PostalVerificationAttempt).where(
372 PostalVerificationAttempt.id == request.postal_verification_attempt_id
373 )
374 ).scalar_one_or_none()
376 if not attempt:
377 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "postal_verification_attempt_not_found")
379 if not attempt.mypostcard_job_id:
380 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "postcard_not_sent")
382 pdf_data = download_pdf(attempt.mypostcard_job_id)
383 return editor_pb2.DownloadPostcardPdfRes(pdf=pdf_data)