Coverage for app / backend / src / couchers / servicers / editor.py: 94%

141 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import json 

2import logging 

3 

4import grpc 

5from geoalchemy2.shape import from_shape 

6from google.protobuf import empty_pb2 

7from shapely.geometry import shape 

8from shapely.geometry.base import BaseGeometry 

9from sqlalchemy import select 

10from sqlalchemy.orm import Session 

11from sqlalchemy.sql import exists, update 

12 

13from couchers import urls 

14from couchers.context import CouchersContext 

15from couchers.db import session_scope 

16from couchers.helpers.clusters import create_cluster, create_node 

17from couchers.jobs.enqueue import queue_job 

18from couchers.materialized_views import LiteUser 

19from couchers.models import EventCommunityInviteRequest, Node, User, Volunteer 

20from couchers.models.notifications import NotificationTopicAction 

21from couchers.notifications.notify import notify 

22from couchers.proto import communities_pb2, editor_pb2, editor_pb2_grpc, notification_data_pb2 

23from couchers.proto.internal import jobs_pb2 

24from couchers.resources import get_static_badge_dict 

25from couchers.servicers.communities import community_to_pb 

26from couchers.servicers.events import generate_event_create_notifications, get_users_to_notify_for_new_event 

27from couchers.servicers.public import format_volunteer_link 

28from couchers.utils import date_to_api, now, parse_date 

29 

30logger = logging.getLogger(__name__) 

31 

32MAX_PAGINATION_LENGTH = 250 

33 

34 

35def load_community_geom(geojson: str, context: CouchersContext) -> BaseGeometry: 

36 geom = shape(json.loads(geojson)) 

37 

38 if geom.geom_type != "MultiPolygon": 

39 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "no_multipolygon") 

40 

41 return geom 

42 

43 

44def volunteer_to_pb(session: Session, volunteer: Volunteer) -> editor_pb2.Volunteer: 

45 """Convert a Volunteer model to the editor protobuf message.""" 

46 lite_user = session.execute(select(LiteUser).where(LiteUser.id == volunteer.user_id)).scalar_one() 

47 board_members = set(get_static_badge_dict()["board_member"]) 

48 

49 return editor_pb2.Volunteer( 

50 user_id=volunteer.user_id, 

51 name=volunteer.display_name or lite_user.name, 

52 username=lite_user.username, 

53 is_board_member=lite_user.id in board_members, 

54 role=volunteer.role, 

55 location=volunteer.display_location or lite_user.city, 

56 img=urls.media_url(filename=lite_user.avatar_filename, size="thumbnail") if lite_user.avatar_filename else None, 

57 sort_key=volunteer.sort_key, 

58 started_volunteering=date_to_api(volunteer.started_volunteering), 

59 stopped_volunteering=date_to_api(volunteer.stopped_volunteering) if volunteer.stopped_volunteering else None, 

60 show_on_team_page=volunteer.show_on_team_page, 

61 **format_volunteer_link(volunteer, lite_user.username), 

62 ) 

63 

64 

65def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload) -> None: 

66 with session_scope() as session: 

67 all_users_ids = session.execute(select(User.id).where(User.is_visible)).scalars().all() 

68 for user_id in all_users_ids: 

69 notify( 

70 session, 

71 user_id=user_id, 

72 topic_action=NotificationTopicAction.general__new_blog_post, 

73 key=payload.url, 

74 data=notification_data_pb2.GeneralNewBlogPost( 

75 url=payload.url, 

76 title=payload.title, 

77 blurb=payload.blurb, 

78 ), 

79 ) 

80 

81 

82class Editor(editor_pb2_grpc.EditorServicer): 

83 def CreateCommunity( 

84 self, request: editor_pb2.CreateCommunityReq, context: CouchersContext, session: Session 

85 ) -> communities_pb2.Community: 

86 geom = load_community_geom(request.geojson, context) 

87 

88 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

89 node = create_node(session, geom, parent_node_id) 

90 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

91 

92 return community_to_pb(session, node, context) 

93 

94 def UpdateCommunity( 

95 self, request: editor_pb2.UpdateCommunityReq, context: CouchersContext, session: Session 

96 ) -> communities_pb2.Community: 

97 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

98 if not node: 

99 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "community_not_found") 

100 cluster = node.official_cluster 

101 

102 if request.name: 102 ↛ 105line 102 didn't jump to line 105 because the condition on line 102 was always true

103 cluster.name = request.name 

104 

105 if request.description: 105 ↛ 108line 105 didn't jump to line 108 because the condition on line 105 was always true

106 cluster.description = request.description 

107 

108 if request.geojson: 108 ↛ 113line 108 didn't jump to line 113 because the condition on line 108 was always true

109 geom = load_community_geom(request.geojson, context) 

110 

111 node.geom = from_shape(geom) 

112 

113 if request.parent_node_id != 0: 

114 node.parent_node_id = request.parent_node_id 

115 

116 session.flush() 

117 

118 return community_to_pb(session, cluster.parent_node, context) 

119 

120 def ListEventCommunityInviteRequests( 

121 self, request: editor_pb2.ListEventCommunityInviteRequestsReq, context: CouchersContext, session: Session 

122 ) -> editor_pb2.ListEventCommunityInviteRequestsRes: 

123 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

124 next_request_id = int(request.page_token) if request.page_token else 0 

125 requests = ( 

126 session.execute( 

127 select(EventCommunityInviteRequest) 

128 .where(EventCommunityInviteRequest.approved.is_(None)) 

129 .where(EventCommunityInviteRequest.id >= next_request_id) 

130 .order_by(EventCommunityInviteRequest.id) 

131 .limit(page_size + 1) 

132 ) 

133 .scalars() 

134 .all() 

135 ) 

136 

137 def _request_to_pb(request: EventCommunityInviteRequest) -> editor_pb2.EventCommunityInviteRequest: 

138 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

139 return editor_pb2.EventCommunityInviteRequest( 

140 event_community_invite_request_id=request.id, 

141 user_id=request.user_id, 

142 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

143 approx_users_to_notify=len(users_to_notify), 

144 community_id=node_id, 

145 ) 

146 

147 return editor_pb2.ListEventCommunityInviteRequestsRes( 

148 requests=[_request_to_pb(request) for request in requests[:page_size]], 

149 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

150 ) 

151 

152 def DecideEventCommunityInviteRequest( 

153 self, request: editor_pb2.DecideEventCommunityInviteRequestReq, context: CouchersContext, session: Session 

154 ) -> editor_pb2.DecideEventCommunityInviteRequestRes: 

155 req = session.execute( 

156 select(EventCommunityInviteRequest).where( 

157 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

158 ) 

159 ).scalar_one_or_none() 

160 

161 if not req: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_community_invite_not_found") 

163 

164 if req.decided: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_decided") 

166 

167 decided = now() 

168 req.decided = decided 

169 req.decided_by_user_id = context.user_id 

170 req.approved = request.approve 

171 

172 # deny other reqs for the same event 

173 if request.approve: 

174 session.execute( 

175 update(EventCommunityInviteRequest) 

176 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

177 .where(EventCommunityInviteRequest.decided.is_(None)) 

178 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

179 ) 

180 

181 session.flush() 

182 

183 if request.approve: 

184 queue_job( 

185 session, 

186 job=generate_event_create_notifications, 

187 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

188 inviting_user_id=req.user_id, 

189 occurrence_id=req.occurrence_id, 

190 approved=True, 

191 ), 

192 ) 

193 

194 return editor_pb2.DecideEventCommunityInviteRequestRes() 

195 

196 def SendBlogPostNotification( 

197 self, request: editor_pb2.SendBlogPostNotificationReq, context: CouchersContext, session: Session 

198 ) -> empty_pb2.Empty: 

199 if len(request.title) > 50: 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_title_too_long") 

201 if len(request.blurb) > 100: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_blog_blurb_too_long") 

203 queue_job( 

204 session, 

205 job=generate_new_blog_post_notifications, 

206 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

207 url=request.url, 

208 title=request.title, 

209 blurb=request.blurb, 

210 ), 

211 ) 

212 return empty_pb2.Empty() 

213 

214 def MakeUserVolunteer( 

215 self, request: editor_pb2.MakeUserVolunteerReq, context: CouchersContext, session: Session 

216 ) -> editor_pb2.Volunteer: 

217 # Check if user exists 

218 if not session.execute(select(exists().where(User.id == request.user_id))).scalar(): 

219 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

220 

221 # Check if user is already a volunteer 

222 if session.execute(select(exists().where(Volunteer.user_id == request.user_id))).scalar(): 

223 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_volunteer") 

224 

225 # Parse started_volunteering date 

226 started_volunteering = None 

227 if request.started_volunteering: 

228 started_volunteering = parse_date(request.started_volunteering) 

229 if not started_volunteering: 

230 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

231 

232 # Create a volunteer record 

233 volunteer = Volunteer( 

234 user_id=request.user_id, 

235 role=request.role, 

236 show_on_team_page=not request.hide_on_team_page, 

237 ) 

238 if started_volunteering: 

239 volunteer.started_volunteering = started_volunteering 

240 session.add(volunteer) 

241 session.flush() 

242 

243 return volunteer_to_pb(session, volunteer) 

244 

245 def UpdateVolunteer( 

246 self, request: editor_pb2.UpdateVolunteerReq, context: CouchersContext, session: Session 

247 ) -> editor_pb2.Volunteer: 

248 # Check if volunteer exists 

249 volunteer = session.execute(select(Volunteer).where(Volunteer.user_id == request.user_id)).scalar_one_or_none() 

250 if not volunteer: 

251 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "volunteer_not_found") 

252 

253 # Update role if provided 

254 if request.HasField("role"): 

255 volunteer.role = request.role.value 

256 

257 # Update sort_key if provided 

258 if request.HasField("sort_key"): 

259 volunteer.sort_key = request.sort_key.value 

260 

261 # Update started_volunteering if provided 

262 if request.HasField("started_volunteering"): 

263 started_volunteering = parse_date(request.started_volunteering.value) 

264 if not started_volunteering: 

265 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_started_volunteering_date") 

266 volunteer.started_volunteering = started_volunteering 

267 

268 # Update stopped_volunteering if provided 

269 if request.HasField("stopped_volunteering"): 

270 stopped_volunteering = parse_date(request.stopped_volunteering.value) 

271 if not stopped_volunteering: 

272 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_stopped_volunteering_date") 

273 volunteer.stopped_volunteering = stopped_volunteering 

274 

275 # Update show_on_team_page if provided 

276 if request.HasField("show_on_team_page"): 

277 volunteer.show_on_team_page = request.show_on_team_page.value 

278 

279 session.flush() 

280 

281 return volunteer_to_pb(session, volunteer) 

282 

283 def ListVolunteers( 

284 self, request: editor_pb2.ListVolunteersReq, context: CouchersContext, session: Session 

285 ) -> editor_pb2.ListVolunteersRes: 

286 # Query volunteers 

287 query = select(Volunteer).join(LiteUser, LiteUser.id == Volunteer.user_id).where(LiteUser.is_visible) 

288 

289 # Filter based on include_past flag 

290 if not request.include_past: 

291 query = query.where(Volunteer.stopped_volunteering.is_(None)) 

292 

293 # Order by same criteria as public API 

294 query = query.order_by( 

295 Volunteer.sort_key.asc().nulls_last(), 

296 Volunteer.stopped_volunteering.desc().nulls_first(), 

297 Volunteer.started_volunteering.asc(), 

298 ) 

299 

300 volunteers = session.execute(query).scalars().all() 

301 

302 return editor_pb2.ListVolunteersRes( 

303 volunteers=[volunteer_to_pb(session, volunteer) for volunteer in volunteers] 

304 )