Coverage for app / backend / src / couchers / servicers / editor.py: 91%

148 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import json 

2import logging 

3 

4import grpc 

5from geoalchemy2.shape import from_shape 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from shapely.geometry.base import BaseGeometry 

9from sqlalchemy import select 

10from sqlalchemy.orm import Session 

11from sqlalchemy.sql import exists, update 

12 

13from couchers import urls 

14from couchers.context import CouchersContext 

15from couchers.db import session_scope 

16from couchers.helpers.clusters import CHILD_NODE_TYPE, create_cluster, create_node 

17from couchers.jobs.enqueue import queue_job 

18from couchers.materialized_views import LiteUser 

19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer 

20from couchers.models.notifications import NotificationTopicAction 

21from couchers.notifications.notify import notify 

22from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2 

23from couchers.proto.internal import jobs_pb2 

24from couchers.resources import get_static_badge_dict 

25from couchers.servicers.communities import community_to_pb 

26from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event 

27from couchers.servicers.public import format_volunteer_link 

28from couchers.utils import date_to_api, now, parse_date 

29 

30logger = logging.getLogger(__name__) 

31 

32MAX_PAGINATION_LENGTH = 250 

33 

34 

35def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry: 

36 geom = shape(json.loads(geojson)) 

37 

38 if geom.geom_type != "MultiPolygon": 

39 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon") 

40 

41 return geom 

42 

43 

44def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer: 

45 """Convert a Volunteer model to the editor protobuf message.""" 

46 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one() 

47 board_members = set(get_static_badge_dict()["board_member"]) 

48 

49 return editor_pb2.Volunteer( 

50 user_id=volunteer.user_id, 

51 name=volunteer.display_name or lite_user.name, 

52 username=lite_user.username, 

53 is_board_member=lite_user.id in board_members, 

54 role=volunteer.role, 

55 location=volunteer.display_location or lite_user.city, 

56 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None, 

57 sort_key=volunteer.sort_key, 

58 started_volunteering=date_to_api(volunteer.started_volunteering), 

59 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None, 

60 show_on_team_page=volunteer.show_on_team_page, 

61 **format_volunteer_link(volunteer, lite_user.username), 

62 ) 

63 

64 

65def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None: 

66 with session_scope() as session: 

67 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all() 

68 for user_id in all_users_ids: 

69 notify( 

70 session, 

71 user_id=user_id, 

72 topic_action=NotificationTopicAction.general__new_blog_post, 

73 key=payload.url, 

74 data=notification_data_pb2.GeneralNewBlogPost( 

75 url=payload.url, 

76 title=payload.title, 

77 blurb=payload.blurb, 

78 ), 

79 ) 

80 

81 

82class Editor(editor_pb2_grpc.EditorServicer): 

83 def CreateCommunity( 

84 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session 

85 ) -> communities_pb2.Community: 

86 geom = load_community_geom(request.geojson, context) 

87 

88 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

89 if parent_node_id is not None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 parent_node = session.execute(select(Node).where(Node.id == parent_node_id)).scalar_one_or_none() 

91 if not parent_node: 

92 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "parent_node_not_found") 

93 parent_type = parent_node.node_type 

94 else: 

95 parent_type = None 

96 node_type = CHILD_NODE_TYPE[parent_type] 

97 node = create_node(session, geom, parent_node_id, node_type) 

98 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

99 

100 return community_to_pb(session, node, context) 

101 

102 def UpdateCommunity( 

103 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session 

104 ) -> communities_pb2.Community: 

105 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

106 if not node: 

107 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

108 cluster = node.official_cluster 

109 

110 if request.name: 110 ↛ 113line 110 didn't jump to line 113 because the condition on line 110 was always true

111 cluster.name = request.name 

112 

113 if request.description: 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true

114 cluster.description = request.description 

115 

116 if request.geojson: 116 ↛ 121line 116 didn't jump to line 121 because the condition on line 116 was always true

117 geom = load_community_geom(request.geojson, context) 

118 

119 node.geom = from_shape(geom) 

120 

121 if request.parent_node_id != 0: 

122 node.parent_node_id = request.parent_node_id 

123 

124 session.flush() 

125 

126 return community_to_pb(session, cluster.parent_node, context) 

127 

128 def ListEventCommunityInviteRequests( 

129 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session 

130 ) -> editor_pb2.ListEventCommunityInviteRequestsRes: 

131 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

132 next_request_id = int(request.page_token) if request.page_token else 0 

133 requests = ( 

134 session.execute( 

135 select(EventCommunityInviteRequest) 

136 .where(EventCommunityInviteRequest.approved.is_(None)) 

137 .where(EventCommunityInviteRequest.id >= next_request_id) 

138 .order_by(EventCommunityInviteRequest.id) 

139 .limit(page_size + 1) 

140 ) 

141 .scalars() 

142 .all() 

143 ) 

144 

145 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest: 

146 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

147 return editor_pb2.EventCommunityInviteRequest( 

148 event_community_invite_request_id=request.id, 

149 user_id=request.user_id, 

150 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

151 approx_users_to_notify=len(users_to_notify), 

152 community_id=node_id, 

153 ) 

154 

155 return editor_pb2.ListEventCommunityInviteRequestsRes( 

156 requests=[_request_to_pb(request) for request in requests[:page_size]], 

157 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

158 ) 

159 

160 def DecideEventCommunityInviteRequest( 

161 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session 

162 ) -> editor_pb2.DecideEventCommunityInviteRequestRes: 

163 req = session.execute( 

164 select(EventCommunityInviteRequest).where( 

165 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

166 ) 

167 ).scalar_one_or_none() 

168 

169 if not req: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found") 

171 

172 if req.decided: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided") 

174 

175 decided = now() 

176 req.decided = decided 

177 req.decided_by_user_id = context.user_id 

178 req.approved = request.approve 

179 

180 # deny other reqs for the same event 

181 if request.approve: 

182 session.execute( 

183 update(EventCommunityInviteRequest) 

184 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

185 .where(EventCommunityInviteRequest.decided.is_(None)) 

186 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

187 ) 

188 

189 session.flush() 

190 

191 if request.approve: 

192 queue_job( 

193 session, 

194 job=generate_event_create_notifications, 

195 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

196 inviting_user_id=req.user_id, 

197 occurrence_id=req.occurrence_id, 

198 approved=True, 

199 ), 

200 ) 

201 

202 return editor_pb2.DecideEventCommunityInviteRequestRes() 

203 

204 def SendBlogPostNotification( 

205 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session 

206 ) -> empty_pb2.Empty: 

207 if len(request.title) > 50: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long") 

209 if len(request.blurb) > 100: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long") 

211 queue_job( 

212 session, 

213 job=generate_new_blog_post_notifications, 

214 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

215 url=request.url, 

216 title=request.title, 

217 blurb=request.blurb, 

218 ), 

219 ) 

220 return empty_pb2.Empty() 

221 

222 def MakeUserVolunteer( 

223 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session 

224 ) -> editor_pb2.Volunteer: 

225 # Check if user exists 

226 if not session.execute(select(exists().where(User.id == request.user_id))).scalar(): 

227 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

228 

229 # Check if user is already a volunteer 

230 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar(): 

231 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer") 

232 

233 # Parse started_volunteering date 

234 started_volunteering = None 

235 if request.started_volunteering: 

236 started_volunteering = parse_date(request.started_volunteering) 

237 if not started_volunteering: 

238 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

239 

240 # Create a volunteer record 

241 volunteer = Volunteer( 

242 user_id=request.user_id, 

243 role=request.role, 

244 show_on_team_page=not request.hide_on_team_page, 

245 ) 

246 if started_volunteering: 

247 volunteer.started_volunteering = started_volunteering 

248 session.add(volunteer) 

249 session.flush() 

250 

251 return volunteer_to_pb(session, volunteer) 

252 

253 def UpdateVolunteer( 

254 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session 

255 ) -> editor_pb2.Volunteer: 

256 # Check if volunteer exists 

257 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none() 

258 if not volunteer: 

259 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found") 

260 

261 # Update role if provided 

262 if request.HasField("role"): 

263 volunteer.role = request.role.value 

264 

265 # Update sort_key if provided 

266 if request.HasField("sort_key"): 

267 volunteer.sort_key = request.sort_key.value 

268 

269 # Update started_volunteering if provided 

270 if request.HasField("started_volunteering"): 

271 started_volunteering = parse_date(request.started_volunteering.value) 

272 if not started_volunteering: 

273 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

274 volunteer.started_volunteering = started_volunteering 

275 

276 # Update stopped_volunteering if provided 

277 if request.HasField("stopped_volunteering"): 

278 stopped_volunteering = parse_date(request.stopped_volunteering.value) 

279 if not stopped_volunteering: 

280 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date") 

281 volunteer.stopped_volunteering = stopped_volunteering 

282 

283 # Update show_on_team_page if provided 

284 if request.HasField("show_on_team_page"): 

285 volunteer.show_on_team_page = request.show_on_team_page.value 

286 

287 session.flush() 

288 

289 return volunteer_to_pb(session, volunteer) 

290 

291 def ListVolunteers( 

292 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session 

293 ) -> editor_pb2.ListVolunteersRes: 

294 # Query volunteers 

295 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible) 

296 

297 # Filter based on include_past flag 

298 if not request.include_past: 

299 query = query.where(Volunteer.stopped_volunteering.is_(None)) 

300 

301 # Order by same criteria as public API 

302 query = query.order_by( 

303 Volunteer.sort_key.asc().nulls_last(), 

304 Volunteer.stopped_volunteering.desc().nulls_first(), 

305 Volunteer.started_volunteering.asc(), 

306 ) 

307 

308 volunteers = session.execute(query).scalars().all() 

309 

310 return editor_pb2.ListVolunteersRes( 

311 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers] 

312 )