Coverage for app / backend / src / couchers / servicers / editor.py: 83%

173 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import json 

2import logging 

3 

4import grpc 

5from geoalchemy2.shape import from_shape 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from shapely.geometry.base import BaseGeometry 

9from sqlalchemy import select 

10from sqlalchemy.orm import Session 

11from sqlalchemy.sql import exists, update 

12 

13from couchers import urls 

14from couchers.context import CouchersContext 

15from couchers.db import session_scope 

16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node 

17from couchers.jobs.enqueue import queue_job 

18from couchers.materialized_views import LiteUser 

19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer 

20from couchers.models.notifications import NotificationTopicAction 

21from couchers.models.postal_verification import PostalVerificationAttempt 

22from couchers.notifications.notify import notify 

23from couchers.postal.my_postcard import download_pdf 

24from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2, postal_verification_pb2 

25from couchers.proto.internal import jobs_pb2 

26from couchers.resources import get_static_badge_dict 

27from couchers.servicers.communities import community_to_pb 

28from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event 

29from couchers.servicers.postal_verification import postalverificationstatus2pb 

30from couchers.servicers.public import format_volunteer_link 

31from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

32 

33logger = logging.getLogger(__name__) 

34 

35MAX_PAGINATION_LENGTH = 250 

36 

37 

38def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry: 

39 geom = shape(json.loads(geojson)) 

40 

41 if geom.geom_type != "MultiPolygon": 

42 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon") 

43 

44 return geom 

45 

46 

47def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer: 

48 """Convert a Volunteer model to the editor protobuf message.""" 

49 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one() 

50 board_members = set(get_static_badge_dict()["board_member"]) 

51 

52 return editor_pb2.Volunteer( 

53 user_id=volunteer.user_id, 

54 name=volunteer.display_name or lite_user.name, 

55 username=lite_user.username, 

56 is_board_member=lite_user.id in board_members, 

57 role=volunteer.role, 

58 location=volunteer.display_location or lite_user.city, 

59 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None, 

60 sort_key=volunteer.sort_key, 

61 started_volunteering=date_to_api(volunteer.started_volunteering), 

62 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None, 

63 show_on_team_page=volunteer.show_on_team_page, 

64 **format_volunteer_link(volunteer, lite_user.username), 

65 ) 

66 

67 

68def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None: 

69 with session_scope() as session: 

70 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all() 

71 for user_id in all_users_ids: 

72 notify( 

73 session, 

74 user_id=user_id, 

75 topic_action=NotificationTopicAction.general__new_blog_post, 

76 key=payload.url, 

77 data=notification_data_pb2.GeneralNewBlogPost( 

78 url=payload.url, 

79 title=payload.title, 

80 blurb=payload.blurb, 

81 ), 

82 ) 

83 

84 

85class Editor(editor_pb2_grpc.EditorServicer): 

86 def CreateCommunity( 

87 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session 

88 ) -> communities_pb2.Community: 

89 geom = load_community_geom(request.geojson, context) 

90 

91 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

92 if parent_node_id is not None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none() 

94 if not parent_node: 

95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "parent_node_not_found") 

96 parent_type = parent_node.node_type 

97 else: 

98 parent_type = None 

99 node_type = CHILD_NODE_TYPE[parent_type] 

100 node = create_node(session, geom, parent_node_id, node_type) 

101 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

102 

103 return community_to_pb(session, node, context) 

104 

105 def UpdateCommunity( 

106 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session 

107 ) -> communities_pb2.Community: 

108 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

109 if not node: 

110 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

111 cluster = node.official_cluster 

112 

113 if request.name: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true

114 cluster.name = request.name 

115 

116 if request.description: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true

117 cluster.description = request.description 

118 

119 if request.geojson: 119 ↛ 124line 119 didn't jump to line 124 because the condition on line 119 was always true

120 geom = load_community_geom(request.geojson, context) 

121 

122 node.geom = from_shape(geom) 

123 

124 if request.parent_node_id != 0: 

125 node.parent_node_id = request.parent_node_id 

126 

127 session.flush() 

128 

129 return community_to_pb(session, cluster.parent_node, context) 

130 

131 def ListEventCommunityInviteRequests( 

132 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session 

133 ) -> editor_pb2.ListEventCommunityInviteRequestsRes: 

134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

135 next_request_id = int(request.page_token) if request.page_token else 0 

136 requests = ( 

137 session.execute( 

138 select(EventCommunityInviteRequest) 

139 .where(EventCommunityInviteRequest.approved.is_(None)) 

140 .where(EventCommunityInviteRequest.id >= next_request_id) 

141 .order_by(EventCommunityInviteRequest.id) 

142 .limit(page_size + 1) 

143 ) 

144 .scalars() 

145 .all() 

146 ) 

147 

148 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest: 

149 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

150 return editor_pb2.EventCommunityInviteRequest( 

151 event_community_invite_request_id=request.id, 

152 user_id=request.user_id, 

153 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

154 approx_users_to_notify=len(users_to_notify), 

155 community_id=node_id, 

156 ) 

157 

158 return editor_pb2.ListEventCommunityInviteRequestsRes( 

159 requests=[_request_to_pb(request) for request in requests[:page_size]], 

160 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

161 ) 

162 

163 def DecideEventCommunityInviteRequest( 

164 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session 

165 ) -> editor_pb2.DecideEventCommunityInviteRequestRes: 

166 req = session.execute( 

167 select(EventCommunityInviteRequest).where( 

168 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

169 ) 

170 ).scalar_one_or_none() 

171 

172 if not req: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found") 

174 

175 if req.decided: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided") 

177 

178 decided = now() 

179 req.decided = decided 

180 req.decided_by_user_id = context.user_id 

181 req.approved = request.approve 

182 

183 # deny other reqs for the same event 

184 if request.approve: 

185 session.execute( 

186 update(EventCommunityInviteRequest) 

187 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

188 .where(EventCommunityInviteRequest.decided.is_(None)) 

189 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

190 ) 

191 

192 session.flush() 

193 

194 if request.approve: 

195 queue_job( 

196 session, 

197 job=generate_event_create_notifications, 

198 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

199 inviting_user_id=req.user_id, 

200 occurrence_id=req.occurrence_id, 

201 approved=True, 

202 ), 

203 ) 

204 

205 return editor_pb2.DecideEventCommunityInviteRequestRes() 

206 

207 def SendBlogPostNotification( 

208 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session 

209 ) -> empty_pb2.Empty: 

210 if len(request.title) > 50: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long") 

212 if len(request.blurb) > 100: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long") 

214 queue_job( 

215 session, 

216 job=generate_new_blog_post_notifications, 

217 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

218 url=request.url, 

219 title=request.title, 

220 blurb=request.blurb, 

221 ), 

222 ) 

223 return empty_pb2.Empty() 

224 

225 def MakeUserVolunteer( 

226 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session 

227 ) -> editor_pb2.Volunteer: 

228 # Check if user exists 

229 if not session.execute(select(exists().where(User.id == request.user_id))).scalar(): 

230 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

231 

232 # Check if user is already a volunteer 

233 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar(): 

234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer") 

235 

236 # Parse started_volunteering date 

237 started_volunteering = None 

238 if request.started_volunteering: 

239 started_volunteering = parse_date(request.started_volunteering) 

240 if not started_volunteering: 

241 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

242 

243 # Create a volunteer record 

244 volunteer = Volunteer( 

245 user_id=request.user_id, 

246 role=request.role, 

247 show_on_team_page=not request.hide_on_team_page, 

248 ) 

249 if started_volunteering: 

250 volunteer.started_volunteering = started_volunteering 

251 session.add(volunteer) 

252 session.flush() 

253 

254 return volunteer_to_pb(session, volunteer) 

255 

256 def UpdateVolunteer( 

257 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session 

258 ) -> editor_pb2.Volunteer: 

259 # Check if volunteer exists 

260 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none() 

261 if not volunteer: 

262 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found") 

263 

264 # Update role if provided 

265 if request.HasField("role"): 

266 volunteer.role = request.role.value 

267 

268 # Update sort_key if provided 

269 if request.HasField("sort_key"): 

270 volunteer.sort_key = request.sort_key.value 

271 

272 # Update started_volunteering if provided 

273 if request.HasField("started_volunteering"): 

274 started_volunteering = parse_date(request.started_volunteering.value) 

275 if not started_volunteering: 

276 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

277 volunteer.started_volunteering = started_volunteering 

278 

279 # Reinstate (clear stopped_volunteering) or update stopped_volunteering 

280 if request.reinstate_volunteer and request.HasField("stopped_volunteering"): 

281 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "cannot_reinstate_and_set_stopped_date") 

282 if request.reinstate_volunteer: 

283 volunteer.stopped_volunteering = None 

284 elif request.HasField("stopped_volunteering"): 

285 stopped_volunteering = parse_date(request.stopped_volunteering.value) 

286 if not stopped_volunteering: 

287 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date") 

288 volunteer.stopped_volunteering = stopped_volunteering 

289 

290 # Update show_on_team_page if provided 

291 if request.HasField("show_on_team_page"): 

292 volunteer.show_on_team_page = request.show_on_team_page.value 

293 

294 session.flush() 

295 

296 return volunteer_to_pb(session, volunteer) 

297 

298 def ListVolunteers( 

299 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session 

300 ) -> editor_pb2.ListVolunteersRes: 

301 # Query volunteers 

302 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible) 

303 

304 # Filter based on include_past flag 

305 if not request.include_past: 

306 query = query.where(Volunteer.stopped_volunteering.is_(None)) 

307 

308 # Order by same criteria as public API 

309 query = query.order_by( 

310 Volunteer.sort_key.asc().nulls_last(), 

311 Volunteer.stopped_volunteering.desc().nulls_first(), 

312 Volunteer.started_volunteering.asc(), 

313 ) 

314 

315 volunteers = session.execute(query).scalars().all() 

316 

317 return editor_pb2.ListVolunteersRes( 

318 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers] 

319 ) 

320 

321 def ListPostcards( 

322 self, request: editor_pb2.ListPostcardsReq, context: CouchersContext, session: Session 

323 ) -> editor_pb2.ListPostcardsRes: 

324 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

325 next_id = int(request.page_token) if request.page_token else None 

326 

327 query = ( 

328 select(PostalVerificationAttempt, User) 

329 .join(User, User.id == PostalVerificationAttempt.user_id) 

330 .order_by(PostalVerificationAttempt.id.desc()) 

331 .limit(page_size + 1) 

332 ) 

333 if next_id is not None: 

334 query = query.where(PostalVerificationAttempt.id <= next_id) 

335 

336 results = session.execute(query).all() 

337 

338 def _attempt_to_pb(attempt: PostalVerificationAttempt, user: User) -> editor_pb2.PostcardInfo: 

339 return editor_pb2.PostcardInfo( 

340 postal_verification_attempt_id=attempt.id, 

341 user_id=attempt.user_id, 

342 username=user.username, 

343 name=user.name, 

344 status=postalverificationstatus2pb.get( 

345 attempt.status, postal_verification_pb2.POSTAL_VERIFICATION_STATUS_UNKNOWN 

346 ), 

347 address=postal_verification_pb2.PostalAddress( 

348 address_line_1=attempt.address_line_1, 

349 address_line_2=attempt.address_line_2, 

350 city=attempt.city, 

351 state=attempt.state, 

352 postal_code=attempt.postal_code, 

353 country_code=attempt.country_code, 

354 ), 

355 created=Timestamp_from_datetime(attempt.created), 

356 postcard_sent_at=Timestamp_from_datetime(attempt.postcard_sent_at) 

357 if attempt.postcard_sent_at 

358 else None, 

359 verified_at=Timestamp_from_datetime(attempt.verified_at) if attempt.verified_at else None, 

360 ) 

361 

362 return editor_pb2.ListPostcardsRes( 

363 postcards=[_attempt_to_pb(attempt, user) for attempt, user in results[:page_size]], 

364 next_page_token=str(results[-1][0].id) if len(results) > page_size else None, 

365 ) 

366 

367 def DownloadPostcardPdf( 

368 self, request: editor_pb2.DownloadPostcardPdfReq, context: CouchersContext, session: Session 

369 ) -> editor_pb2.DownloadPostcardPdfRes: 

370 attempt = session.execute( 

371 select(PostalVerificationAttempt).where( 

372 PostalVerificationAttempt.id == request.postal_verification_attempt_id 

373 ) 

374 ).scalar_one_or_none() 

375 

376 if not attempt: 

377 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "postal_verification_attempt_not_found") 

378 

379 if not attempt.mypostcard_job_id: 

380 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "postcard_not_sent") 

381 

382 pdf_data = download_pdf(attempt.mypostcard_job_id) 

383 return editor_pb2.DownloadPostcardPdfRes(pdf=pdf_data)