Coverage for app/backend/src/couchers/servicers/editor.py: 83%

173 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import json 

2import logging 

3 

4import grpc 

5from geoalchemy2.shape import from_shape 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from shapely.geometry.base import BaseGeometry 

9from sqlalchemy import select 

10from sqlalchemy.orm import Session 

11from sqlalchemy.sql import exists, update 

12 

13from couchers import urls 

14from couchers.context import CouchersContext 

15from couchers.db import session_scope 

16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node 

17from couchers.jobs.enqueue import queue_job 

18from couchers.materialized_views import LiteUser 

19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer 

20from couchers.models.notifications import NotificationTopicAction 

21from couchers.models.postal_verification import PostalVerificationAttempt 

22from couchers.notifications.notify import notify 

23from couchers.postal.my_postcard import download_pdf 

24from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2, postal_verification_pb2 

25from couchers.proto.internal import jobs_pb2 

26from couchers.resources import get_static_badge_dict 

27from couchers.servicers.communities import community_to_pb 

28from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event 

29from couchers.servicers.postal_verification import postalverificationstatus2pb 

30from couchers.servicers.public import format_volunteer_link 

31from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date 

32 

33logger = logging.getLogger(__name__) 

34 

35MAX_PAGINATION_LENGTH = 250 

36 

37 

38def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry: 

39 geom = shape(json.loads(geojson)) 

40 

41 if geom.geom_type != "MultiPolygon": 

42 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:no_multipolygon") 

43 

44 return geom 

45 

46 

47def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer: 

48 """Convert a Volunteer model to the editor protobuf message.""" 

49 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one() 

50 board_members = set(get_static_badge_dict()["board_member"]) 

51 

52 return editor_pb2.Volunteer( 

53 user_id=volunteer.user_id, 

54 name=volunteer.display_name or lite_user.name, 

55 username=lite_user.username, 

56 is_board_member=lite_user.id in board_members, 

57 role=volunteer.role, 

58 location=volunteer.display_location or lite_user.city, 

59 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None, 

60 sort_key=volunteer.sort_key, 

61 started_volunteering=date_to_api(volunteer.started_volunteering), 

62 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None, 

63 show_on_team_page=volunteer.show_on_team_page, 

64 **format_volunteer_link(volunteer, lite_user.username), 

65 ) 

66 

67 

68def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None: 

69 with session_scope() as session: 

70 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all() 

71 for user_id in all_users_ids: 

72 notify( 

73 session, 

74 user_id=user_id, 

75 topic_action=NotificationTopicAction.general__new_blog_post, 

76 key=payload.url, 

77 data=notification_data_pb2.GeneralNewBlogPost( 

78 url=payload.url, 

79 title=payload.title, 

80 blurb=payload.blurb, 

81 ), 

82 ) 

83 

84 

85class Editor(editor_pb2_grpc.EditorServicer): 

86 def CreateCommunity( 

87 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session 

88 ) -> communities_pb2.Community: 

89 geom = load_community_geom(request.geojson, context) 

90 

91 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

92 if parent_node_id is not None: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none() 

94 if not parent_node: 

95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:parent_node_not_found") 

96 parent_type = parent_node.node_type 

97 else: 

98 parent_type = None 

99 node_type = CHILD_NODE_TYPE[parent_type] 

100 node = create_node(session, geom, parent_node_id, node_type) 

101 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

102 

103 return community_to_pb(session, node, context) 

104 

105 def UpdateCommunity( 

106 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session 

107 ) -> communities_pb2.Community: 

108 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

109 if not node: 

110 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

111 cluster = node.official_cluster 

112 

113 if request.name: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true

114 cluster.name = request.name 

115 

116 if request.description: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true

117 cluster.description = request.description 

118 

119 if request.geojson: 119 ↛ 124line 119 didn't jump to line 124 because the condition on line 119 was always true

120 geom = load_community_geom(request.geojson, context) 

121 

122 node.geom = from_shape(geom) 

123 

124 if request.parent_node_id != 0: 

125 node.parent_node_id = request.parent_node_id 

126 

127 session.flush() 

128 

129 return community_to_pb(session, cluster.parent_node, context) 

130 

131 def ListEventCommunityInviteRequests( 

132 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session 

133 ) -> editor_pb2.ListEventCommunityInviteRequestsRes: 

134 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

135 next_request_id = int(request.page_token) if request.page_token else 0 

136 requests = ( 

137 session.execute( 

138 select(EventCommunityInviteRequest) 

139 .where(EventCommunityInviteRequest.approved.is_(None)) 

140 .where(EventCommunityInviteRequest.id >= next_request_id) 

141 .order_by(EventCommunityInviteRequest.id) 

142 .limit(page_size + 1) 

143 ) 

144 .scalars() 

145 .all() 

146 ) 

147 

148 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest: 

149 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

150 return editor_pb2.EventCommunityInviteRequest( 

151 event_community_invite_request_id=request.id, 

152 user_id=request.user_id, 

153 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

154 approx_users_to_notify=len(users_to_notify), 

155 community_id=node_id, 

156 ) 

157 

158 return editor_pb2.ListEventCommunityInviteRequestsRes( 

159 requests=[_request_to_pb(request) for request in requests[:page_size]], 

160 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

161 ) 

162 

163 def DecideEventCommunityInviteRequest( 

164 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session 

165 ) -> editor_pb2.DecideEventCommunityInviteRequestRes: 

166 req = session.execute( 

167 select(EventCommunityInviteRequest).where( 

168 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

169 ) 

170 ).scalar_one_or_none() 

171 

172 if not req: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:event_community_invite_not_found") 

174 

175 if req.decided: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 context.abort_with_error_code( 

177 grpc.StatusCode.FAILED_PRECONDITION, "admin:event_community_invite_already_decided" 

178 ) 

179 

180 decided = now() 

181 req.decided = decided 

182 req.decided_by_user_id = context.user_id 

183 req.approved = request.approve 

184 

185 # deny other reqs for the same event 

186 if request.approve: 

187 session.execute( 

188 update(EventCommunityInviteRequest) 

189 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

190 .where(EventCommunityInviteRequest.decided.is_(None)) 

191 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

192 ) 

193 

194 session.flush() 

195 

196 if request.approve: 

197 queue_job( 

198 session, 

199 job=generate_event_create_notifications, 

200 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

201 inviting_user_id=req.user_id, 

202 occurrence_id=req.occurrence_id, 

203 approved=True, 

204 ), 

205 ) 

206 

207 return editor_pb2.DecideEventCommunityInviteRequestRes() 

208 

209 def SendBlogPostNotification( 

210 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session 

211 ) -> empty_pb2.Empty: 

212 if len(request.title) > 50: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:blog_title_too_long") 

214 if len(request.blurb) > 100: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:blog_blurb_too_long") 

216 queue_job( 

217 session, 

218 job=generate_new_blog_post_notifications, 

219 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

220 url=request.url, 

221 title=request.title, 

222 blurb=request.blurb, 

223 ), 

224 ) 

225 return empty_pb2.Empty() 

226 

227 def MakeUserVolunteer( 

228 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session 

229 ) -> editor_pb2.Volunteer: 

230 # Check if user exists 

231 if not session.execute(select(exists().where(User.id == request.user_id))).scalar(): 

232 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

233 

234 # Check if user is already a volunteer 

235 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar(): 

236 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_volunteer") 

237 

238 # Parse started_volunteering date 

239 started_volunteering = None 

240 if request.started_volunteering: 

241 started_volunteering = parse_date(request.started_volunteering) 

242 if not started_volunteering: 

243 context.abort_with_error_code( 

244 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_started_volunteering_date" 

245 ) 

246 

247 # Create a volunteer record 

248 volunteer = Volunteer( 

249 user_id=request.user_id, 

250 role=request.role, 

251 show_on_team_page=not request.hide_on_team_page, 

252 ) 

253 if started_volunteering: 

254 volunteer.started_volunteering = started_volunteering 

255 session.add(volunteer) 

256 session.flush() 

257 

258 return volunteer_to_pb(session, volunteer) 

259 

260 def UpdateVolunteer( 

261 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session 

262 ) -> editor_pb2.Volunteer: 

263 # Check if volunteer exists 

264 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none() 

265 if not volunteer: 

266 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:volunteer_not_found") 

267 

268 # Update role if provided 

269 if request.HasField("role"): 

270 volunteer.role = request.role.value 

271 

272 # Update sort_key if provided 

273 if request.HasField("sort_key"): 

274 volunteer.sort_key = request.sort_key.value 

275 

276 # Update started_volunteering if provided 

277 if request.HasField("started_volunteering"): 

278 started_volunteering = parse_date(request.started_volunteering.value) 

279 if not started_volunteering: 

280 context.abort_with_error_code( 

281 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_started_volunteering_date" 

282 ) 

283 volunteer.started_volunteering = started_volunteering 

284 

285 # Reinstate (clear stopped_volunteering) or update stopped_volunteering 

286 if request.reinstate_volunteer and request.HasField("stopped_volunteering"): 

287 context.abort_with_error_code( 

288 grpc.StatusCode.INVALID_ARGUMENT, "admin:cannot_reinstate_and_set_stopped_date" 

289 ) 

290 if request.reinstate_volunteer: 

291 volunteer.stopped_volunteering = None 

292 elif request.HasField("stopped_volunteering"): 

293 stopped_volunteering = parse_date(request.stopped_volunteering.value) 

294 if not stopped_volunteering: 

295 context.abort_with_error_code( 

296 grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_stopped_volunteering_date" 

297 ) 

298 volunteer.stopped_volunteering = stopped_volunteering 

299 

300 # Update show_on_team_page if provided 

301 if request.HasField("show_on_team_page"): 

302 volunteer.show_on_team_page = request.show_on_team_page.value 

303 

304 session.flush() 

305 

306 return volunteer_to_pb(session, volunteer) 

307 

308 def ListVolunteers( 

309 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session 

310 ) -> editor_pb2.ListVolunteersRes: 

311 # Query volunteers 

312 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible) 

313 

314 # Filter based on include_past flag 

315 if not request.include_past: 

316 query = query.where(Volunteer.stopped_volunteering.is_(None)) 

317 

318 # Order by same criteria as public API 

319 query = query.order_by( 

320 Volunteer.sort_key.asc().nulls_last(), 

321 Volunteer.stopped_volunteering.desc().nulls_first(), 

322 Volunteer.started_volunteering.asc(), 

323 ) 

324 

325 volunteers = session.execute(query).scalars().all() 

326 

327 return editor_pb2.ListVolunteersRes( 

328 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers] 

329 ) 

330 

331 def ListPostcards( 

332 self, request: editor_pb2.ListPostcardsReq, context: CouchersContext, session: Session 

333 ) -> editor_pb2.ListPostcardsRes: 

334 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

335 next_id = int(request.page_token) if request.page_token else None 

336 

337 query = ( 

338 select(PostalVerificationAttempt, User) 

339 .join(User, User.id == PostalVerificationAttempt.user_id) 

340 .order_by(PostalVerificationAttempt.id.desc()) 

341 .limit(page_size + 1) 

342 ) 

343 if next_id is not None: 

344 query = query.where(PostalVerificationAttempt.id <= next_id) 

345 

346 results = session.execute(query).all() 

347 

348 def _attempt_to_pb(attempt: PostalVerificationAttempt, user: User) -> editor_pb2.PostcardInfo: 

349 return editor_pb2.PostcardInfo( 

350 postal_verification_attempt_id=attempt.id, 

351 user_id=attempt.user_id, 

352 username=user.username, 

353 name=user.name, 

354 status=postalverificationstatus2pb.get( 

355 attempt.status, postal_verification_pb2.POSTAL_VERIFICATION_STATUS_UNKNOWN 

356 ), 

357 address=postal_verification_pb2.PostalAddress( 

358 address_line_1=attempt.address_line_1, 

359 address_line_2=attempt.address_line_2, 

360 city=attempt.city, 

361 state=attempt.state, 

362 postal_code=attempt.postal_code, 

363 country_code=attempt.country_code, 

364 ), 

365 created=Timestamp_from_datetime(attempt.created), 

366 postcard_sent_at=Timestamp_from_datetime(attempt.postcard_sent_at) 

367 if attempt.postcard_sent_at 

368 else None, 

369 verified_at=Timestamp_from_datetime(attempt.verified_at) if attempt.verified_at else None, 

370 ) 

371 

372 return editor_pb2.ListPostcardsRes( 

373 postcards=[_attempt_to_pb(attempt, user) for attempt, user in results[:page_size]], 

374 next_page_token=str(results[-1][0].id) if len(results) > page_size else None, 

375 ) 

376 

377 def DownloadPostcardPdf( 

378 self, request: editor_pb2.DownloadPostcardPdfReq, context: CouchersContext, session: Session 

379 ) -> editor_pb2.DownloadPostcardPdfRes: 

380 attempt = session.execute( 

381 select(PostalVerificationAttempt).where( 

382 PostalVerificationAttempt.id == request.postal_verification_attempt_id 

383 ) 

384 ).scalar_one_or_none() 

385 

386 if not attempt: 

387 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "postal_verification_attempt_not_found") 

388 

389 if not attempt.mypostcard_job_id: 

390 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:postcard_not_sent") 

391 

392 pdf_data = download_pdf(attempt.mypostcard_job_id) 

393 return editor_pb2.DownloadPostcardPdfRes(pdf=pdf_data)