Coverage for src/couchers/servicers/events.py: 84%

500 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from psycopg2.extras import DateTimeTZRange 

7from sqlalchemy.sql import and_, func, or_, select, update 

8 

9from couchers import errors 

10from couchers.context import make_background_user_context 

11from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.models import ( 

14 AttendeeStatus, 

15 Cluster, 

16 ClusterSubscription, 

17 Event, 

18 EventCommunityInviteRequest, 

19 EventOccurrence, 

20 EventOccurrenceAttendee, 

21 EventOrganizer, 

22 EventSubscription, 

23 Node, 

24 Thread, 

25 Upload, 

26 User, 

27) 

28from couchers.notifications.notify import notify 

29from couchers.servicers.api import user_model_to_pb 

30from couchers.servicers.blocking import is_not_visible 

31from couchers.servicers.threads import thread_to_pb 

32from couchers.sql import couchers_select as select 

33from couchers.tasks import send_event_community_invite_request_email 

34from couchers.utils import ( 

35 Timestamp_from_datetime, 

36 create_coordinate, 

37 dt_from_millis, 

38 millis_from_dt, 

39 now, 

40 to_aware_datetime, 

41) 

42from proto import events_pb2, events_pb2_grpc, notification_data_pb2 

43from proto.internal import jobs_pb2 

44 

45logger = logging.getLogger(__name__) 

46 

47attendancestate2sql = { 

48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe, 

51} 

52 

53attendancestate2api = { 

54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE, 

57} 

58 

59MAX_PAGINATION_LENGTH = 25 

60 

61 

62def _is_event_owner(event: Event, user_id): 

63 """ 

64 Checks whether the user can act as an owner of the event 

65 """ 

66 if event.owner_user: 

67 return event.owner_user_id == user_id 

68 # otherwise owned by a cluster 

69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

70 

71 

72def _is_event_organizer(event: Event, user_id): 

73 """ 

74 Checks whether the user is as an organizer of the event 

75 """ 

76 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None 

77 

78 

79def _can_moderate_event(session, event: Event, user_id): 

80 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

81 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

82 return True 

83 

84 # finally check if the user can moderate the parent node of the cluster 

85 return can_moderate_node(session, user_id, event.parent_node_id) 

86 

87 

88def _can_edit_event(session, event, user_id): 

89 return ( 

90 _is_event_owner(event, user_id) 

91 or _is_event_organizer(event, user_id) 

92 or _can_moderate_event(session, event, user_id) 

93 ) 

94 

95 

96def event_to_pb(session, occurrence: EventOccurrence, context): 

97 event = occurrence.event 

98 

99 next_occurrence = ( 

100 event.occurrences.where(EventOccurrence.end_time >= now()) 

101 .order_by(EventOccurrence.end_time.asc()) 

102 .limit(1) 

103 .one_or_none() 

104 ) 

105 

106 owner_community_id = None 

107 owner_group_id = None 

108 if event.owner_cluster: 

109 if event.owner_cluster.is_official_cluster: 

110 owner_community_id = event.owner_cluster.parent_node_id 

111 else: 

112 owner_group_id = event.owner_cluster.id 

113 

114 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

115 attendance_state = attendance.attendee_status if attendance else None 

116 

117 can_moderate = _can_moderate_event(session, event, context.user_id) 

118 can_edit = _can_edit_event(session, event, context.user_id) 

119 

120 going_count = session.execute( 

121 select(func.count()) 

122 .select_from(EventOccurrenceAttendee) 

123 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

124 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

125 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going) 

126 ).scalar_one() 

127 maybe_count = session.execute( 

128 select(func.count()) 

129 .select_from(EventOccurrenceAttendee) 

130 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

131 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

132 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe) 

133 ).scalar_one() 

134 

135 organizer_count = session.execute( 

136 select(func.count()) 

137 .select_from(EventOrganizer) 

138 .where_users_column_visible(context, EventOrganizer.user_id) 

139 .where(EventOrganizer.event_id == event.id) 

140 ).scalar_one() 

141 subscriber_count = session.execute( 

142 select(func.count()) 

143 .select_from(EventSubscription) 

144 .where_users_column_visible(context, EventSubscription.user_id) 

145 .where(EventSubscription.event_id == event.id) 

146 ).scalar_one() 

147 

148 return events_pb2.Event( 

149 event_id=occurrence.id, 

150 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

151 is_cancelled=occurrence.is_cancelled, 

152 is_deleted=occurrence.is_deleted, 

153 title=event.title, 

154 slug=event.slug, 

155 content=occurrence.content, 

156 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

157 online_information=( 

158 events_pb2.OnlineEventInformation( 

159 link=occurrence.link, 

160 ) 

161 if occurrence.link 

162 else None 

163 ), 

164 offline_information=( 

165 events_pb2.OfflineEventInformation( 

166 lat=occurrence.coordinates[0], 

167 lng=occurrence.coordinates[1], 

168 address=occurrence.address, 

169 ) 

170 if occurrence.geom 

171 else None 

172 ), 

173 created=Timestamp_from_datetime(occurrence.created), 

174 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

175 creator_user_id=occurrence.creator_user_id, 

176 start_time=Timestamp_from_datetime(occurrence.start_time), 

177 end_time=Timestamp_from_datetime(occurrence.end_time), 

178 timezone=occurrence.timezone, 

179 start_time_display=str(occurrence.start_time), 

180 end_time_display=str(occurrence.end_time), 

181 attendance_state=attendancestate2api[attendance_state], 

182 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

183 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

184 going_count=going_count, 

185 maybe_count=maybe_count, 

186 organizer_count=organizer_count, 

187 subscriber_count=subscriber_count, 

188 owner_user_id=event.owner_user_id, 

189 owner_community_id=owner_community_id, 

190 owner_group_id=owner_group_id, 

191 thread=thread_to_pb(session, event.thread_id), 

192 can_edit=can_edit, 

193 can_moderate=can_moderate, 

194 ) 

195 

196 

197def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool): 

198 query = ( 

199 select(Event, EventOccurrence) 

200 .where(EventOccurrence.id == occurrence_id) 

201 .where(EventOccurrence.event_id == Event.id) 

202 ) 

203 

204 if not include_deleted: 

205 query = query.where(~EventOccurrence.is_deleted) 

206 

207 return query 

208 

209 

210def _get_event_and_occurrence_one( 

211 session, occurrence_id, include_deleted: bool = False 

212) -> tuple[Event, EventOccurrence]: 

213 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one() 

214 

215 

216def _get_event_and_occurrence_one_or_none( 

217 session, occurrence_id, include_deleted: bool = False 

218) -> tuple[Event, EventOccurrence] | None: 

219 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none() 

220 

221 

222def _check_occurrence_time_validity(start_time, end_time, context): 

223 if start_time < now(): 

224 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST) 

225 if end_time < start_time: 

226 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS) 

227 if end_time - start_time > timedelta(days=7): 

228 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG) 

229 if start_time - now() > timedelta(days=365): 

230 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE) 

231 

232 

233def get_users_to_notify_for_new_event(session, occurrence): 

234 """ 

235 Returns the users to notify, as well as the community id that is being notified (None if based on geo search) 

236 """ 

237 cluster = occurrence.event.parent_node.official_cluster 

238 if cluster.parent_node_id == 1: 

239 logger.info("The Global Community is too big for email notifications.") 

240 return [], occurrence.event.parent_node_id 

241 elif occurrence.creator_user in cluster.admins or cluster.is_leaf: 

242 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id 

243 else: 

244 max_radius = 20000 # m 

245 users = ( 

246 session.execute( 

247 select(User) 

248 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

249 .where(User.is_visible) 

250 .where(ClusterSubscription.cluster_id == cluster.id) 

251 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111)) 

252 ) 

253 .scalars() 

254 .all() 

255 ) 

256 return users, None 

257 

258 

259def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload): 

260 """ 

261 Background job to generated/fan out event notifications 

262 """ 

263 from couchers.servicers.communities import community_to_pb 

264 

265 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}") 

266 

267 with session_scope() as session: 

268 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

269 creator = occurrence.creator_user 

270 

271 users, node_id = get_users_to_notify_for_new_event(session, occurrence) 

272 

273 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none() 

274 

275 if not inviting_user: 

276 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?") 

277 return 

278 

279 for user in users: 

280 if is_not_visible(session, user.id, creator.id): 

281 continue 

282 context = make_background_user_context(user_id=user.id) 

283 notify( 

284 session, 

285 user_id=user.id, 

286 topic_action="event:create_approved" if payload.approved else "event:create_any", 

287 key=payload.occurrence_id, 

288 data=notification_data_pb2.EventCreate( 

289 event=event_to_pb(session, occurrence, context), 

290 inviting_user=user_model_to_pb(inviting_user, session, context), 

291 nearby=True if node_id is None else None, 

292 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None, 

293 ), 

294 ) 

295 

296 

297def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload): 

298 with session_scope() as session: 

299 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

300 

301 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none() 

302 

303 subscribed_user_ids = [user.id for user in event.subscribers] 

304 attending_user_ids = [user.user_id for user in occurrence.attendances] 

305 

306 for user_id in set(subscribed_user_ids + attending_user_ids): 

307 if is_not_visible(session, user_id, updating_user.id): 

308 continue 

309 context = make_background_user_context(user_id=user_id) 

310 notify( 

311 session, 

312 user_id=user_id, 

313 topic_action="event:update", 

314 key=payload.occurrence_id, 

315 data=notification_data_pb2.EventUpdate( 

316 event=event_to_pb(session, occurrence, context), 

317 updating_user=user_model_to_pb(updating_user, session, context), 

318 updated_items=payload.updated_items, 

319 ), 

320 ) 

321 

322 

323def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload): 

324 with session_scope() as session: 

325 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

326 

327 cancelling_user = session.execute( 

328 select(User).where(User.id == payload.cancelling_user_id) 

329 ).scalar_one_or_none() 

330 

331 subscribed_user_ids = [user.id for user in event.subscribers] 

332 attending_user_ids = [user.user_id for user in occurrence.attendances] 

333 

334 for user_id in set(subscribed_user_ids + attending_user_ids): 

335 if is_not_visible(session, user_id, cancelling_user.id): 

336 continue 

337 context = make_background_user_context(user_id=user_id) 

338 notify( 

339 session, 

340 user_id=user_id, 

341 topic_action="event:cancel", 

342 key=payload.occurrence_id, 

343 data=notification_data_pb2.EventCancel( 

344 event=event_to_pb(session, occurrence, context), 

345 cancelling_user=user_model_to_pb(cancelling_user, session, context), 

346 ), 

347 ) 

348 

349 

350def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload): 

351 with session_scope() as session: 

352 event, occurrence = _get_event_and_occurrence_one( 

353 session, occurrence_id=payload.occurrence_id, include_deleted=True 

354 ) 

355 

356 subscribed_user_ids = [user.id for user in event.subscribers] 

357 attending_user_ids = [user.user_id for user in occurrence.attendances] 

358 

359 for user_id in set(subscribed_user_ids + attending_user_ids): 

360 context = make_background_user_context(user_id=user_id) 

361 notify( 

362 session, 

363 user_id=user_id, 

364 topic_action="event:delete", 

365 key=payload.occurrence_id, 

366 data=notification_data_pb2.EventDelete( 

367 event=event_to_pb(session, occurrence, context), 

368 ), 

369 ) 

370 

371 

372class Events(events_pb2_grpc.EventsServicer): 

373 def CreateEvent(self, request, context, session): 

374 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

375 if not user.has_completed_profile: 

376 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT) 

377 if not request.title: 

378 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE) 

379 if not request.content: 

380 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

381 if request.HasField("online_information"): 

382 online = True 

383 geom = None 

384 address = None 

385 if not request.online_information.link: 

386 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

387 link = request.online_information.link 

388 elif request.HasField("offline_information"): 

389 online = False 

390 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value 

391 if not ( 

392 request.offline_information.address 

393 and request.offline_information.lat 

394 and request.offline_information.lng 

395 ): 

396 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

397 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

398 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

399 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

400 address = request.offline_information.address 

401 link = None 

402 else: 

403 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

404 

405 start_time = to_aware_datetime(request.start_time) 

406 end_time = to_aware_datetime(request.end_time) 

407 

408 _check_occurrence_time_validity(start_time, end_time, context) 

409 

410 if request.parent_community_id: 

411 parent_node = session.execute( 

412 select(Node).where(Node.id == request.parent_community_id) 

413 ).scalar_one_or_none() 

414 

415 if not parent_node.official_cluster.events_enabled: 

416 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED) 

417 else: 

418 if online: 

419 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY) 

420 # parent community computed from geom 

421 parent_node = get_parent_node_at_location(session, geom) 

422 

423 if not parent_node: 

424 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

425 

426 if ( 

427 request.photo_key 

428 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

429 ): 

430 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

431 

432 event = Event( 

433 title=request.title, 

434 parent_node_id=parent_node.id, 

435 owner_user_id=context.user_id, 

436 thread=Thread(), 

437 creator_user_id=context.user_id, 

438 ) 

439 session.add(event) 

440 

441 occurrence = EventOccurrence( 

442 event=event, 

443 content=request.content, 

444 geom=geom, 

445 address=address, 

446 link=link, 

447 photo_key=request.photo_key if request.photo_key != "" else None, 

448 # timezone=timezone, 

449 during=DateTimeTZRange(start_time, end_time), 

450 creator_user_id=context.user_id, 

451 ) 

452 session.add(occurrence) 

453 

454 session.add( 

455 EventOrganizer( 

456 user_id=context.user_id, 

457 event=event, 

458 ) 

459 ) 

460 

461 session.add( 

462 EventSubscription( 

463 user_id=context.user_id, 

464 event=event, 

465 ) 

466 ) 

467 

468 session.add( 

469 EventOccurrenceAttendee( 

470 user_id=context.user_id, 

471 occurrence=occurrence, 

472 attendee_status=AttendeeStatus.going, 

473 ) 

474 ) 

475 

476 session.commit() 

477 

478 if user.has_completed_profile: 

479 queue_job( 

480 session, 

481 "generate_event_create_notifications", 

482 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

483 inviting_user_id=user.id, 

484 occurrence_id=occurrence.id, 

485 approved=False, 

486 ), 

487 ) 

488 

489 return event_to_pb(session, occurrence, context) 

490 

491 def ScheduleEvent(self, request, context, session): 

492 if not request.content: 

493 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

494 if request.HasField("online_information"): 

495 geom = None 

496 address = None 

497 link = request.online_information.link 

498 elif request.HasField("offline_information"): 

499 if not ( 

500 request.offline_information.address 

501 and request.offline_information.lat 

502 and request.offline_information.lng 

503 ): 

504 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

505 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

506 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

507 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

508 address = request.offline_information.address 

509 link = None 

510 else: 

511 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

512 

513 start_time = to_aware_datetime(request.start_time) 

514 end_time = to_aware_datetime(request.end_time) 

515 

516 _check_occurrence_time_validity(start_time, end_time, context) 

517 

518 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

519 if not res: 

520 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

521 

522 event, occurrence = res 

523 

524 if not _can_edit_event(session, event, context.user_id): 

525 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

526 

527 if occurrence.is_cancelled: 

528 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

529 

530 if ( 

531 request.photo_key 

532 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

533 ): 

534 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

535 

536 during = DateTimeTZRange(start_time, end_time) 

537 

538 # && is the overlap operator for ranges 

539 if ( 

540 session.execute( 

541 select(EventOccurrence.id) 

542 .where(EventOccurrence.event_id == event.id) 

543 .where(EventOccurrence.during.op("&&")(during)) 

544 .limit(1) 

545 ) 

546 .scalars() 

547 .one_or_none() 

548 is not None 

549 ): 

550 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

551 

552 occurrence = EventOccurrence( 

553 event=event, 

554 content=request.content, 

555 geom=geom, 

556 address=address, 

557 link=link, 

558 photo_key=request.photo_key if request.photo_key != "" else None, 

559 # timezone=timezone, 

560 during=during, 

561 creator_user_id=context.user_id, 

562 ) 

563 session.add(occurrence) 

564 

565 session.add( 

566 EventOccurrenceAttendee( 

567 user_id=context.user_id, 

568 occurrence=occurrence, 

569 attendee_status=AttendeeStatus.going, 

570 ) 

571 ) 

572 

573 session.flush() 

574 

575 # TODO: notify 

576 

577 return event_to_pb(session, occurrence, context) 

578 

579 def UpdateEvent(self, request, context, session): 

580 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

581 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

582 if not res: 

583 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

584 

585 event, occurrence = res 

586 

587 if not _can_edit_event(session, event, context.user_id): 

588 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

589 

590 # the things that were updated and need to be notified about 

591 notify_updated = [] 

592 

593 if occurrence.is_cancelled: 

594 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

595 

596 occurrence_update = {"last_edited": now()} 

597 

598 if request.HasField("title"): 

599 notify_updated.append("title") 

600 event.title = request.title.value 

601 event.last_edited = now() 

602 

603 if request.HasField("content"): 

604 notify_updated.append("content") 

605 occurrence_update["content"] = request.content.value 

606 

607 if request.HasField("photo_key"): 

608 occurrence_update["photo_key"] = request.photo_key.value 

609 

610 if request.HasField("online_information"): 

611 notify_updated.append("location") 

612 if not request.online_information.link: 

613 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

614 occurrence_update["link"] = request.online_information.link 

615 occurrence_update["geom"] = None 

616 occurrence_update["address"] = None 

617 elif request.HasField("offline_information"): 

618 notify_updated.append("location") 

619 occurrence_update["link"] = None 

620 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

621 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

622 occurrence_update["geom"] = create_coordinate( 

623 request.offline_information.lat, request.offline_information.lng 

624 ) 

625 occurrence_update["address"] = request.offline_information.address 

626 

627 if request.HasField("start_time") or request.HasField("end_time"): 

628 if request.update_all_future: 

629 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES) 

630 if request.HasField("start_time"): 

631 notify_updated.append("start time") 

632 start_time = to_aware_datetime(request.start_time) 

633 else: 

634 start_time = occurrence.start_time 

635 if request.HasField("end_time"): 

636 notify_updated.append("end time") 

637 end_time = to_aware_datetime(request.end_time) 

638 else: 

639 end_time = occurrence.end_time 

640 

641 _check_occurrence_time_validity(start_time, end_time, context) 

642 

643 during = DateTimeTZRange(start_time, end_time) 

644 

645 # && is the overlap operator for ranges 

646 if ( 

647 session.execute( 

648 select(EventOccurrence.id) 

649 .where(EventOccurrence.event_id == event.id) 

650 .where(EventOccurrence.id != occurrence.id) 

651 .where(EventOccurrence.during.op("&&")(during)) 

652 .limit(1) 

653 ) 

654 .scalars() 

655 .one_or_none() 

656 is not None 

657 ): 

658 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

659 

660 occurrence_update["during"] = during 

661 

662 # TODO 

663 # if request.HasField("timezone"): 

664 # occurrence_update["timezone"] = request.timezone 

665 

666 # allow editing any event which hasn't ended more than 24 hours before now 

667 # when editing all future events, we edit all which have not yet ended 

668 

669 if request.update_all_future: 

670 session.execute( 

671 update(EventOccurrence) 

672 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

673 .where(EventOccurrence.start_time >= occurrence.start_time) 

674 .values(occurrence_update) 

675 .execution_options(synchronize_session=False) 

676 ) 

677 else: 

678 if occurrence.end_time < now() - timedelta(hours=24): 

679 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

680 session.execute( 

681 update(EventOccurrence) 

682 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

683 .where(EventOccurrence.id == occurrence.id) 

684 .values(occurrence_update) 

685 .execution_options(synchronize_session=False) 

686 ) 

687 

688 session.flush() 

689 

690 if notify_updated: 

691 if request.should_notify: 

692 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying") 

693 

694 queue_job( 

695 session, 

696 "generate_event_update_notifications", 

697 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload( 

698 updating_user_id=user.id, 

699 occurrence_id=occurrence.id, 

700 updated_items=notify_updated, 

701 ), 

702 ) 

703 else: 

704 logger.info( 

705 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications" 

706 ) 

707 

708 # since we have synchronize_session=False, we have to refresh the object 

709 session.refresh(occurrence) 

710 

711 return event_to_pb(session, occurrence, context) 

712 

713 def GetEvent(self, request, context, session): 

714 occurrence = session.execute( 

715 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

716 ).scalar_one_or_none() 

717 

718 if not occurrence: 

719 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

720 

721 return event_to_pb(session, occurrence, context) 

722 

723 def CancelEvent(self, request, context, session): 

724 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

725 if not res: 

726 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

727 

728 event, occurrence = res 

729 

730 if not _can_edit_event(session, event, context.user_id): 

731 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

732 

733 if occurrence.end_time < now() - timedelta(hours=24): 

734 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT) 

735 

736 occurrence.is_cancelled = True 

737 

738 queue_job( 

739 session, 

740 "generate_event_cancel_notifications", 

741 payload=jobs_pb2.GenerateEventCancelNotificationsPayload( 

742 cancelling_user_id=context.user_id, 

743 occurrence_id=occurrence.id, 

744 ), 

745 ) 

746 

747 return empty_pb2.Empty() 

748 

749 def RequestCommunityInvite(self, request, context, session): 

750 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

751 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

752 if not res: 

753 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

754 

755 event, occurrence = res 

756 

757 if not _can_edit_event(session, event, context.user_id): 

758 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

759 

760 if occurrence.is_cancelled: 

761 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

762 

763 if occurrence.end_time < now() - timedelta(hours=24): 

764 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

765 

766 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id] 

767 

768 if len(this_user_reqs) > 0: 

769 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED) 

770 

771 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved] 

772 

773 if len(approved_reqs) > 0: 

774 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED) 

775 

776 request = EventCommunityInviteRequest( 

777 occurrence_id=request.event_id, 

778 user_id=context.user_id, 

779 ) 

780 session.add(request) 

781 session.flush() 

782 

783 send_event_community_invite_request_email(session, request) 

784 

785 return empty_pb2.Empty() 

786 

787 def ListEventOccurrences(self, request, context, session): 

788 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

789 # the page token is a unix timestamp of where we left off 

790 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

791 occurrence = session.execute( 

792 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

793 ).scalar_one_or_none() 

794 if not occurrence: 

795 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

796 

797 occurrences = ( 

798 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted) 

799 ) 

800 

801 if not request.include_cancelled: 

802 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

803 

804 if not request.past: 

805 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

806 EventOccurrence.start_time.asc() 

807 ) 

808 else: 

809 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

810 EventOccurrence.start_time.desc() 

811 ) 

812 

813 occurrences = occurrences.limit(page_size + 1) 

814 occurrences = session.execute(occurrences).scalars().all() 

815 

816 return events_pb2.ListEventOccurrencesRes( 

817 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

818 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

819 ) 

820 

821 def ListEventAttendees(self, request, context, session): 

822 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

823 next_user_id = int(request.page_token) if request.page_token else 0 

824 occurrence = session.execute( 

825 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

826 ).scalar_one_or_none() 

827 if not occurrence: 

828 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

829 attendees = ( 

830 session.execute( 

831 select(EventOccurrenceAttendee) 

832 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

833 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

834 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

835 .order_by(EventOccurrenceAttendee.user_id) 

836 .limit(page_size + 1) 

837 ) 

838 .scalars() 

839 .all() 

840 ) 

841 return events_pb2.ListEventAttendeesRes( 

842 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

843 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

844 ) 

845 

846 def ListEventSubscribers(self, request, context, session): 

847 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

848 next_user_id = int(request.page_token) if request.page_token else 0 

849 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

850 if not res: 

851 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

852 event, occurrence = res 

853 subscribers = ( 

854 session.execute( 

855 select(EventSubscription) 

856 .where_users_column_visible(context, EventSubscription.user_id) 

857 .where(EventSubscription.event_id == event.id) 

858 .where(EventSubscription.user_id >= next_user_id) 

859 .order_by(EventSubscription.user_id) 

860 .limit(page_size + 1) 

861 ) 

862 .scalars() 

863 .all() 

864 ) 

865 return events_pb2.ListEventSubscribersRes( 

866 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

867 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

868 ) 

869 

870 def ListEventOrganizers(self, request, context, session): 

871 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

872 next_user_id = int(request.page_token) if request.page_token else 0 

873 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

874 if not res: 

875 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

876 event, occurrence = res 

877 organizers = ( 

878 session.execute( 

879 select(EventOrganizer) 

880 .where_users_column_visible(context, EventOrganizer.user_id) 

881 .where(EventOrganizer.event_id == event.id) 

882 .where(EventOrganizer.user_id >= next_user_id) 

883 .order_by(EventOrganizer.user_id) 

884 .limit(page_size + 1) 

885 ) 

886 .scalars() 

887 .all() 

888 ) 

889 return events_pb2.ListEventOrganizersRes( 

890 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

891 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

892 ) 

893 

894 def TransferEvent(self, request, context, session): 

895 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

896 if not res: 

897 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

898 

899 event, occurrence = res 

900 

901 if not _can_edit_event(session, event, context.user_id): 

902 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED) 

903 

904 if occurrence.is_cancelled: 

905 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

906 

907 if occurrence.end_time < now() - timedelta(hours=24): 

908 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

909 

910 if request.WhichOneof("new_owner") == "new_owner_group_id": 

911 cluster = session.execute( 

912 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

913 ).scalar_one_or_none() 

914 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

915 cluster = session.execute( 

916 select(Cluster) 

917 .where(Cluster.parent_node_id == request.new_owner_community_id) 

918 .where(Cluster.is_official_cluster) 

919 ).scalar_one_or_none() 

920 

921 if not cluster: 

922 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

923 

924 event.owner_user = None 

925 event.owner_cluster = cluster 

926 

927 session.commit() 

928 return event_to_pb(session, occurrence, context) 

929 

930 def SetEventSubscription(self, request, context, session): 

931 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

932 if not res: 

933 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

934 

935 event, occurrence = res 

936 

937 if occurrence.is_cancelled: 

938 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

939 

940 if occurrence.end_time < now() - timedelta(hours=24): 

941 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

942 

943 current_subscription = session.execute( 

944 select(EventSubscription) 

945 .where(EventSubscription.user_id == context.user_id) 

946 .where(EventSubscription.event_id == event.id) 

947 ).scalar_one_or_none() 

948 

949 # if not subscribed, subscribe 

950 if request.subscribe and not current_subscription: 

951 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

952 

953 # if subscribed but unsubbing, remove subscription 

954 if not request.subscribe and current_subscription: 

955 session.delete(current_subscription) 

956 

957 session.flush() 

958 

959 return event_to_pb(session, occurrence, context) 

960 

961 def SetEventAttendance(self, request, context, session): 

962 occurrence = session.execute( 

963 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

964 ).scalar_one_or_none() 

965 

966 if not occurrence: 

967 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

968 

969 if occurrence.is_cancelled: 

970 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

971 

972 if occurrence.end_time < now() - timedelta(hours=24): 

973 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

974 

975 current_attendance = session.execute( 

976 select(EventOccurrenceAttendee) 

977 .where(EventOccurrenceAttendee.user_id == context.user_id) 

978 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

979 ).scalar_one_or_none() 

980 

981 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

982 if current_attendance: 

983 session.delete(current_attendance) 

984 # if unset/not going, nothing to do! 

985 else: 

986 if current_attendance: 

987 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] 

988 else: 

989 # create new 

990 attendance = EventOccurrenceAttendee( 

991 user_id=context.user_id, 

992 occurrence_id=occurrence.id, 

993 attendee_status=attendancestate2sql[request.attendance_state], 

994 ) 

995 session.add(attendance) 

996 

997 session.flush() 

998 

999 return event_to_pb(session, occurrence, context) 

1000 

1001 def ListMyEvents(self, request, context, session): 

1002 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1003 # the page token is a unix timestamp of where we left off 

1004 page_token = ( 

1005 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

1006 ) 

1007 # the page number is the page number we are on 

1008 page_number = request.page_number or 1 

1009 # Calculate the offset for pagination 

1010 offset = (page_number - 1) * page_size 

1011 occurrences = ( 

1012 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1013 ) 

1014 

1015 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

1016 include_subscribed = request.subscribed or include_all 

1017 include_organizing = request.organizing or include_all 

1018 include_attending = request.attending or include_all 

1019 include_my_communities = request.my_communities or include_all 

1020 

1021 where_ = [] 

1022 

1023 if include_subscribed: 

1024 occurrences = occurrences.outerjoin( 

1025 EventSubscription, 

1026 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

1027 ) 

1028 where_.append(EventSubscription.user_id != None) 

1029 if include_organizing: 

1030 occurrences = occurrences.outerjoin( 

1031 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

1032 ) 

1033 where_.append(EventOrganizer.user_id != None) 

1034 if include_attending: 

1035 occurrences = occurrences.outerjoin( 

1036 EventOccurrenceAttendee, 

1037 and_( 

1038 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

1039 EventOccurrenceAttendee.user_id == context.user_id, 

1040 ), 

1041 ) 

1042 where_.append(EventOccurrenceAttendee.user_id != None) 

1043 if include_my_communities: 

1044 my_communities = ( 

1045 session.execute( 

1046 select(Node.id) 

1047 .join(Cluster, Cluster.parent_node_id == Node.id) 

1048 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

1049 .where(ClusterSubscription.user_id == context.user_id) 

1050 .where(Cluster.is_official_cluster) 

1051 .order_by(Node.id) 

1052 .limit(100000) 

1053 ) 

1054 .scalars() 

1055 .all() 

1056 ) 

1057 where_.append(Event.parent_node_id.in_(my_communities)) 

1058 

1059 occurrences = occurrences.where(or_(*where_)) 

1060 

1061 if not request.include_cancelled: 

1062 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

1063 

1064 if not request.past: 

1065 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

1066 EventOccurrence.start_time.asc() 

1067 ) 

1068 else: 

1069 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

1070 EventOccurrence.start_time.desc() 

1071 ) 

1072 # Count the total number of items for pagination 

1073 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar() 

1074 # Apply pagination by page number 

1075 occurrences = ( 

1076 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1) 

1077 ) 

1078 occurrences = session.execute(occurrences).scalars().all() 

1079 

1080 return events_pb2.ListMyEventsRes( 

1081 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1082 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1083 total_items=total_items, 

1084 ) 

1085 

1086 def ListAllEvents(self, request, context, session): 

1087 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1088 # the page token is a unix timestamp of where we left off 

1089 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

1090 

1091 occurrences = ( 

1092 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1093 ) 

1094 

1095 if not request.include_cancelled: 

1096 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

1097 

1098 if not request.past: 

1099 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

1100 EventOccurrence.start_time.asc() 

1101 ) 

1102 else: 

1103 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

1104 EventOccurrence.start_time.desc() 

1105 ) 

1106 

1107 occurrences = occurrences.limit(page_size + 1) 

1108 occurrences = session.execute(occurrences).scalars().all() 

1109 

1110 return events_pb2.ListAllEventsRes( 

1111 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1112 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1113 ) 

1114 

1115 def InviteEventOrganizer(self, request, context, session): 

1116 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

1117 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1118 if not res: 

1119 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

1120 

1121 event, occurrence = res 

1122 

1123 if not _can_edit_event(session, event, context.user_id): 

1124 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

1125 

1126 if occurrence.is_cancelled: 

1127 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

1128 

1129 if occurrence.end_time < now() - timedelta(hours=24): 

1130 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

1131 

1132 if not session.execute( 

1133 select(User).where_users_visible(context).where(User.id == request.user_id) 

1134 ).scalar_one_or_none(): 

1135 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

1136 

1137 session.add( 

1138 EventOrganizer( 

1139 user_id=request.user_id, 

1140 event=event, 

1141 ) 

1142 ) 

1143 session.flush() 

1144 

1145 other_user_context = make_background_user_context(user_id=request.user_id) 

1146 

1147 notify( 

1148 session, 

1149 user_id=request.user_id, 

1150 topic_action="event:invite_organizer", 

1151 key=event.id, 

1152 data=notification_data_pb2.EventInviteOrganizer( 

1153 event=event_to_pb(session, occurrence, other_user_context), 

1154 inviting_user=user_model_to_pb(user, session, other_user_context), 

1155 ), 

1156 ) 

1157 

1158 return empty_pb2.Empty() 

1159 

1160 def RemoveEventOrganizer(self, request, context, session): 

1161 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1162 if not res: 

1163 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

1164 

1165 event, occurrence = res 

1166 

1167 if occurrence.is_cancelled: 

1168 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

1169 

1170 if occurrence.end_time < now() - timedelta(hours=24): 

1171 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

1172 

1173 # Determine which user to remove 

1174 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id 

1175 

1176 # Check if the target user is the event owner (only after permission check) 

1177 if event.owner_user_id == user_id_to_remove: 

1178 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER) 

1179 

1180 # Check permissions: either an organizer removing an organizer OR you're the event owner 

1181 if not _can_edit_event(session, event, context.user_id): 

1182 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_EDIT_PERMISSION_DENIED) 

1183 

1184 # Find the organizer to remove 

1185 organizer_to_remove = session.execute( 

1186 select(EventOrganizer) 

1187 .where(EventOrganizer.user_id == user_id_to_remove) 

1188 .where(EventOrganizer.event_id == event.id) 

1189 ).scalar_one_or_none() 

1190 

1191 if not organizer_to_remove: 

1192 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER) 

1193 

1194 session.delete(organizer_to_remove) 

1195 

1196 return empty_pb2.Empty()