Coverage for app / backend / src / couchers / servicers / editor.py: 91%
148 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import json
2import logging
4import grpc
5from geoalchemy2.shape import from_shape
6from google.protobuf import empty_pb2
7from shapely.geometry import shape
8from shapely.geometry.base import BaseGeometry
9from sqlalchemy import select
10from sqlalchemy.orm import Session
11from sqlalchemy.sql import exists, update
13from couchers import urls
14from couchers.context import CouchersContext
15from couchers.db import session_scope
16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node
17from couchers.jobs.enqueue import queue_job
18from couchers.materialized_views import LiteUser
19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer
20from couchers.models.notifications import NotificationTopicAction
21from couchers.notifications.notify import notify
22from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2
23from couchers.proto.internal import jobs_pb2
24from couchers.resources import get_static_badge_dict
25from couchers.servicers.communities import community_to_pb
26from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event
27from couchers.servicers.public import format_volunteer_link
28from couchers.utils import date_to_api, now, parse_date
30logger = logging.getLogger(__name__)
32MAX_PAGINATION_LENGTH = 250
35def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry:
36 geom = shape(json.loads(geojson))
38 if geom.geom_type != "MultiPolygon":
39 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon")
41 return geom
44def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer:
45 """Convert a Volunteer model to the editor protobuf message."""
46 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one()
47 board_members = set(get_static_badge_dict()["board_member"])
49 return editor_pb2.Volunteer(
50 user_id=volunteer.user_id,
51 name=volunteer.display_name or lite_user.name,
52 username=lite_user.username,
53 is_board_member=lite_user.id in board_members,
54 role=volunteer.role,
55 location=volunteer.display_location or lite_user.city,
56 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None,
57 sort_key=volunteer.sort_key,
58 started_volunteering=date_to_api(volunteer.started_volunteering),
59 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None,
60 show_on_team_page=volunteer.show_on_team_page,
61 **format_volunteer_link(volunteer, lite_user.username),
62 )
65def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None:
66 with session_scope() as session:
67 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all()
68 for user_id in all_users_ids:
69 notify(
70 session,
71 user_id=user_id,
72 topic_action=NotificationTopicAction.general__new_blog_post,
73 key=payload.url,
74 data=notification_data_pb2.GeneralNewBlogPost(
75 url=payload.url,
76 title=payload.title,
77 blurb=payload.blurb,
78 ),
79 )
82class Editor(editor_pb2_grpc.EditorServicer):
83 def CreateCommunity(
84 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session
85 ) -> communities_pb2.Community:
86 geom = load_community_geom(request.geojson, context)
88 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None
89 if parent_node_id is not None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none()
91 if not parent_node:
92 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "parent_node_not_found")
93 parent_type = parent_node.node_type
94 else:
95 parent_type = None
96 node_type = CHILD_NODE_TYPE[parent_type]
97 node = create_node(session, geom, parent_node_id, node_type)
98 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True)
100 return community_to_pb(session, node, context)
102 def UpdateCommunity(
103 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session
104 ) -> communities_pb2.Community:
105 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none()
106 if not node:
107 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found")
108 cluster = node.official_cluster
110 if request.name: 110 ↛ 113line 110 didn't jump to line 113 because the condition on line 110 was always true
111 cluster.name = request.name
113 if request.description: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true
114 cluster.description = request.description
116 if request.geojson: 116 ↛ 121line 116 didn't jump to line 121 because the condition on line 116 was always true
117 geom = load_community_geom(request.geojson, context)
119 node.geom = from_shape(geom)
121 if request.parent_node_id != 0:
122 node.parent_node_id = request.parent_node_id
124 session.flush()
126 return community_to_pb(session, cluster.parent_node, context)
128 def ListEventCommunityInviteRequests(
129 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session
130 ) -> editor_pb2.ListEventCommunityInviteRequestsRes:
131 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
132 next_request_id = int(request.page_token) if request.page_token else 0
133 requests = (
134 session.execute(
135 select(EventCommunityInviteRequest)
136 .where(EventCommunityInviteRequest.approved.is_(None))
137 .where(EventCommunityInviteRequest.id >= next_request_id)
138 .order_by(EventCommunityInviteRequest.id)
139 .limit(page_size + 1)
140 )
141 .scalars()
142 .all()
143 )
145 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest:
146 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence)
147 return editor_pb2.EventCommunityInviteRequest(
148 event_community_invite_request_id=request.id,
149 user_id=request.user_id,
150 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug),
151 approx_users_to_notify=len(users_to_notify),
152 community_id=node_id,
153 )
155 return editor_pb2.ListEventCommunityInviteRequestsRes(
156 requests=[_request_to_pb(request) for request in requests[:page_size]],
157 next_page_token=str(requests[-1].id) if len(requests) > page_size else None,
158 )
160 def DecideEventCommunityInviteRequest(
161 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session
162 ) -> editor_pb2.DecideEventCommunityInviteRequestRes:
163 req = session.execute(
164 select(EventCommunityInviteRequest).where(
165 EventCommunityInviteRequest.id == request.event_community_invite_request_id
166 )
167 ).scalar_one_or_none()
169 if not req: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found")
172 if req.decided: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided")
175 decided = now()
176 req.decided = decided
177 req.decided_by_user_id = context.user_id
178 req.approved = request.approve
180 # deny other reqs for the same event
181 if request.approve:
182 session.execute(
183 update(EventCommunityInviteRequest)
184 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id)
185 .where(EventCommunityInviteRequest.decided.is_(None))
186 .values(decided=decided, decided_by_user_id=context.user_id, approved=False)
187 )
189 session.flush()
191 if request.approve:
192 queue_job(
193 session,
194 job=generate_event_create_notifications,
195 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
196 inviting_user_id=req.user_id,
197 occurrence_id=req.occurrence_id,
198 approved=True,
199 ),
200 )
202 return editor_pb2.DecideEventCommunityInviteRequestRes()
204 def SendBlogPostNotification(
205 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session
206 ) -> empty_pb2.Empty:
207 if len(request.title) > 50: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long")
209 if len(request.blurb) > 100: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long")
211 queue_job(
212 session,
213 job=generate_new_blog_post_notifications,
214 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload(
215 url=request.url,
216 title=request.title,
217 blurb=request.blurb,
218 ),
219 )
220 return empty_pb2.Empty()
222 def MakeUserVolunteer(
223 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session
224 ) -> editor_pb2.Volunteer:
225 # Check if user exists
226 if not session.execute(select(exists().where(User.id == request.user_id))).scalar():
227 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
229 # Check if user is already a volunteer
230 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar():
231 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer")
233 # Parse started_volunteering date
234 started_volunteering = None
235 if request.started_volunteering:
236 started_volunteering = parse_date(request.started_volunteering)
237 if not started_volunteering:
238 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
240 # Create a volunteer record
241 volunteer = Volunteer(
242 user_id=request.user_id,
243 role=request.role,
244 show_on_team_page=not request.hide_on_team_page,
245 )
246 if started_volunteering:
247 volunteer.started_volunteering = started_volunteering
248 session.add(volunteer)
249 session.flush()
251 return volunteer_to_pb(session, volunteer)
253 def UpdateVolunteer(
254 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session
255 ) -> editor_pb2.Volunteer:
256 # Check if volunteer exists
257 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none()
258 if not volunteer:
259 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found")
261 # Update role if provided
262 if request.HasField("role"):
263 volunteer.role = request.role.value
265 # Update sort_key if provided
266 if request.HasField("sort_key"):
267 volunteer.sort_key = request.sort_key.value
269 # Update started_volunteering if provided
270 if request.HasField("started_volunteering"):
271 started_volunteering = parse_date(request.started_volunteering.value)
272 if not started_volunteering:
273 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date")
274 volunteer.started_volunteering = started_volunteering
276 # Update stopped_volunteering if provided
277 if request.HasField("stopped_volunteering"):
278 stopped_volunteering = parse_date(request.stopped_volunteering.value)
279 if not stopped_volunteering:
280 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date")
281 volunteer.stopped_volunteering = stopped_volunteering
283 # Update show_on_team_page if provided
284 if request.HasField("show_on_team_page"):
285 volunteer.show_on_team_page = request.show_on_team_page.value
287 session.flush()
289 return volunteer_to_pb(session, volunteer)
291 def ListVolunteers(
292 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session
293 ) -> editor_pb2.ListVolunteersRes:
294 # Query volunteers
295 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible)
297 # Filter based on include_past flag
298 if not request.include_past:
299 query = query.where(Volunteer.stopped_volunteering.is_(None))
301 # Order by same criteria as public API
302 query = query.order_by(
303 Volunteer.sort_key.asc().nulls_last(),
304 Volunteer.stopped_volunteering.desc().nulls_first(),
305 Volunteer.started_volunteering.asc(),
306 )
308 volunteers = session.execute(query).scalars().all()
310 return editor_pb2.ListVolunteersRes(
311 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers]
312 )