Coverage for app / backend / src / couchers / servicers / editor.py: 94%
141 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import json
2import logging
4import grpc
5from geoalchemy2.shape import from_shape
6from google.protobuf import empty_pb2
7from shapely.geometry import shape
8from shapely.geometry.base import BaseGeometry
9from sqlalchemy import select
10from sqlalchemy.orm import Session
11from sqlalchemy.sql import exists, update
13from couchers import urls
14from couchers.context import CouchersContext
15from couchers.db import session_scope
16from couchers.helpers.clusters import create_cluster, create_node
17from couchers.jobs.enqueue import queue_job
18from couchers.materialized_views import LiteUser
19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer
20from couchers.models.notifications import NotificationTopicAction
21from couchers.notifications.notify import notify
22from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2
23from couchers.proto.internal import jobs_pb2
24from couchers.resources import get_static_badge_dict
25from couchers.servicers.communities import community_to_pb
26from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event
27from couchers.servicers.public import format_volunteer_link
28from couchers.utils import date_to_api, now, parse_date
30logger = logging.getLogger(__name__)
32MAX_PAGINATION_LENGTH = 250
35def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry:
36 geom = shape(json.loads(geojson))
38 if geom.geom_type != "MultiPolygon":
39 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon")
41 return geom
44def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer:
45 """Convert a Volunteer model to the editor protobuf message."""
46 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one()
47 board_members = set(get_static_badge_dict()["board_member"])
49 return editor_pb2.Volunteer(
50 user_id=volunteer.user_id,
51 name=volunteer.display_name or lite_user.name,
52 username=lite_user.username,
53 is_board_member=lite_user.id in board_members,
54 role=volunteer.role,
55 location=volunteer.display_location or lite_user.city,
56 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None,
57 sort_key=volunteer.sort_key,
58 started_volunteering=date_to_api(volunteer.started_volunteering),
59 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None,
60 show_on_team_page=volunteer.show_on_team_page,
61 **format_volunteer_link(volunteer, lite_user.username),
62 )
65def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None:
66 with session_scope() as session:
67 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all()
68 for user_id in all_users_ids:
69 notify(
70 session,
71 user_id=user_id,
72 topic_action=NotificationTopicAction.general__new_blog_post,
73 key=payload.url,
74 data=notification_data_pb2.GeneralNewBlogPost(
75 url=payload.url,
76 title=payload.title,
77 blurb=payload.blurb,
78 ),
79 )
82class Editor(editor_pb2_grpc.EditorServicer):
83 def CreateCommunity(
84 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session
85 ) -> communities_pb2.Community:
86 geom = load_community_geom(request.geojson, context)
88 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
89 node = create_node(session, geom, parent_node_id)
90 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
92 return community_to_pb(session, node, context)
94 def UpdateCommunity(
95 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session
96 ) -> communities_pb2.Community:
97 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
98 if not node:
99 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
100 cluster = node.official_cluster
102 if request.name: 102 ↛ 105line 102 didn't jump to line 105 because the condition on line 102 was always true
103 cluster.name = request.name
105 if request.description: 105 ↛ 108line 105 didn't jump to line 108 because the condition on line 105 was always true
106 cluster.description = request.description
108 if request.geojson: 108 ↛ 113line 108 didn't jump to line 113 because the condition on line 108 was always true
109 geom = load_community_geom(request.geojson, context)
111 node.geom = from_shape(geom)
113 if request.parent_node_id != 0:
114 node.parent_node_id = request.parent_node_id
116 session.flush()
118 return community_to_pb(session, cluster.parent_node, context)
120 def ListEventCommunityInviteRequests(
121 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session
122 ) -> editor_pb2.ListEventCommunityInviteRequestsRes:
123 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
124 next_request_id = int(request.page_token) if request.page_token else 0
125 requests = (
126 session.execute(
127 select(EventCommunityInviteRequest)
128 .where(EventCommunityInviteRequest.approved.is_(None))
129 .where(EventCommunityInviteRequest.id >= next_request_id)
130 .order_by(EventCommunityInviteRequest.id)
131 .limit(page_size + 1)
132 )
133 .scalars()
134 .all()
135 )
137 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest:
138 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
139 return editor_pb2.EventCommunityInviteRequest(
140 event_community_invite_request_id=request.id,
141 user_id=request.user_id,
142 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
143 approx_users_to_notify=len(users_to_notify),
144 community_id=node_id,
145 )
147 return editor_pb2.ListEventCommunityInviteRequestsRes(
148 requests=[_request_to_pb(request) for request in requests[:page_size]],
149 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
150 )
152 def DecideEventCommunityInviteRequest(
153 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session
154 ) -> editor_pb2.DecideEventCommunityInviteRequestRes:
155 req = session.execute(
156 select(EventCommunityInviteRequest).where(
157 EventCommunityInviteRequest.id == request.event_community_invite_request_id
158 )
159 ).scalar_one_or_none()
161 if not req: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found")
164 if req.decided: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided")
167 decided = now()
168 req.decided = decided
169 req.decided_by_user_id = context.user_id
170 req.approved = request.approve
172 # deny other reqs for the same event
173 if request.approve:
174 session.execute(
175 update(EventCommunityInviteRequest)
176 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
177 .where(EventCommunityInviteRequest.decided.is_(None))
178 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
179 )
181 session.flush()
183 if request.approve:
184 queue_job(
185 session,
186 job=generate_event_create_notifications,
187 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
188 inviting_user_id=req.user_id,
189 occurrence_id=req.occurrence_id,
190 approved=True,
191 ),
192 )
194 return editor_pb2.DecideEventCommunityInviteRequestRes()
196 def SendBlogPostNotification(
197 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session
198 ) -> empty_pb2.Empty:
199 if len(request.title) > 50: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long")
201 if len(request.blurb) > 100: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long")
203 queue_job(
204 session,
205 job=generate_new_blog_post_notifications,
206 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
207 url=request.url,
208 title=request.title,
209 blurb=request.blurb,
210 ),
211 )
212 return empty_pb2.Empty()
214 def MakeUserVolunteer(
215 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session
216 ) -> editor_pb2.Volunteer:
217 # Check if user exists
218 if not session.execute(select(exists().where(User.id == request.user_id))).scalar():
219 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
221 # Check if user is already a volunteer
222 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar():
223 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer")
225 # Parse started_volunteering date
226 started_volunteering = None
227 if request.started_volunteering:
228 started_volunteering = parse_date(request.started_volunteering)
229 if not started_volunteering:
230 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
232 # Create a volunteer record
233 volunteer = Volunteer(
234 user_id=request.user_id,
235 role=request.role,
236 show_on_team_page=not request.hide_on_team_page,
237 )
238 if started_volunteering:
239 volunteer.started_volunteering = started_volunteering
240 session.add(volunteer)
241 session.flush()
243 return volunteer_to_pb(session, volunteer)
245 def UpdateVolunteer(
246 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session
247 ) -> editor_pb2.Volunteer:
248 # Check if volunteer exists
249 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none()
250 if not volunteer:
251 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found")
253 # Update role if provided
254 if request.HasField("role"):
255 volunteer.role = request.role.value
257 # Update sort_key if provided
258 if request.HasField("sort_key"):
259 volunteer.sort_key = request.sort_key.value
261 # Update started_volunteering if provided
262 if request.HasField("started_volunteering"):
263 started_volunteering = parse_date(request.started_volunteering.value)
264 if not started_volunteering:
265 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
266 volunteer.started_volunteering = started_volunteering
268 # Update stopped_volunteering if provided
269 if request.HasField("stopped_volunteering"):
270 stopped_volunteering = parse_date(request.stopped_volunteering.value)
271 if not stopped_volunteering:
272 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date")
273 volunteer.stopped_volunteering = stopped_volunteering
275 # Update show_on_team_page if provided
276 if request.HasField("show_on_team_page"):
277 volunteer.show_on_team_page = request.show_on_team_page.value
279 session.flush()
281 return volunteer_to_pb(session, volunteer)
283 def ListVolunteers(
284 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session
285 ) -> editor_pb2.ListVolunteersRes:
286 # Query volunteers
287 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible)
289 # Filter based on include_past flag
290 if not request.include_past:
291 query = query.where(Volunteer.stopped_volunteering.is_(None))
293 # Order by same criteria as public API
294 query = query.order_by(
295 Volunteer.sort_key.asc().nulls_last(),
296 Volunteer.stopped_volunteering.desc().nulls_first(),
297 Volunteer.started_volunteering.asc(),
298 )
300 volunteers = session.execute(query).scalars().all()
302 return editor_pb2.ListVolunteersRes(
303 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers]
304 )