Coverage for src/couchers/servicers/events.py: 83%

494 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-12-20 18:03 +0000

1import logging 

2from datetime import timedelta 

3from types import SimpleNamespace 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from psycopg2.extras import DateTimeTZRange 

8from sqlalchemy.sql import and_, func, or_, select, update 

9 

10from couchers import errors 

11from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

12from couchers.jobs.enqueue import queue_job 

13from couchers.models import ( 

14 AttendeeStatus, 

15 Cluster, 

16 ClusterSubscription, 

17 Event, 

18 EventCommunityInviteRequest, 

19 EventOccurrence, 

20 EventOccurrenceAttendee, 

21 EventOrganizer, 

22 EventSubscription, 

23 Node, 

24 Thread, 

25 Upload, 

26 User, 

27) 

28from couchers.notifications.notify import notify 

29from couchers.servicers.api import user_model_to_pb 

30from couchers.servicers.blocking import are_blocked 

31from couchers.servicers.threads import thread_to_pb 

32from couchers.sql import couchers_select as select 

33from couchers.tasks import send_event_community_invite_request_email 

34from couchers.utils import ( 

35 Timestamp_from_datetime, 

36 create_coordinate, 

37 dt_from_millis, 

38 millis_from_dt, 

39 now, 

40 to_aware_datetime, 

41) 

42from proto import events_pb2, events_pb2_grpc, notification_data_pb2 

43from proto.internal import jobs_pb2 

44 

45logger = logging.getLogger(__name__) 

46 

47attendancestate2sql = { 

48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe, 

51} 

52 

53attendancestate2api = { 

54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE, 

57} 

58 

59MAX_PAGINATION_LENGTH = 25 

60 

61 

62def _is_event_owner(event: Event, user_id): 

63 """ 

64 Checks whether the user can act as an owner of the event 

65 """ 

66 if event.owner_user: 

67 return event.owner_user_id == user_id 

68 # otherwise owned by a cluster 

69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

70 

71 

72def _can_moderate_event(session, event: Event, user_id): 

73 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

74 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

75 return True 

76 

77 # finally check if the user can moderate the parent node of the cluster 

78 return can_moderate_node(session, user_id, event.parent_node_id) 

79 

80 

81def _can_edit_event(session, event, user_id): 

82 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id) 

83 

84 

85def event_to_pb(session, occurrence: EventOccurrence, context): 

86 event = occurrence.event 

87 

88 next_occurrence = ( 

89 event.occurrences.where(EventOccurrence.end_time >= now()).order_by(EventOccurrence.end_time.asc()).first() 

90 ) 

91 

92 owner_community_id = None 

93 owner_group_id = None 

94 if event.owner_cluster: 

95 if event.owner_cluster.is_official_cluster: 

96 owner_community_id = event.owner_cluster.parent_node_id 

97 else: 

98 owner_group_id = event.owner_cluster.id 

99 

100 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

101 attendance_state = attendance.attendee_status if attendance else None 

102 

103 can_moderate = _can_moderate_event(session, event, context.user_id) 

104 can_edit = _can_edit_event(session, event, context.user_id) 

105 

106 going_count = session.execute( 

107 select(func.count()) 

108 .select_from(EventOccurrenceAttendee) 

109 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

110 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

111 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going) 

112 ).scalar_one() 

113 maybe_count = session.execute( 

114 select(func.count()) 

115 .select_from(EventOccurrenceAttendee) 

116 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

117 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

118 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe) 

119 ).scalar_one() 

120 

121 organizer_count = session.execute( 

122 select(func.count()) 

123 .select_from(EventOrganizer) 

124 .where_users_column_visible(context, EventOrganizer.user_id) 

125 .where(EventOrganizer.event_id == event.id) 

126 ).scalar_one() 

127 subscriber_count = session.execute( 

128 select(func.count()) 

129 .select_from(EventSubscription) 

130 .where_users_column_visible(context, EventSubscription.user_id) 

131 .where(EventSubscription.event_id == event.id) 

132 ).scalar_one() 

133 

134 return events_pb2.Event( 

135 event_id=occurrence.id, 

136 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

137 is_cancelled=occurrence.is_cancelled, 

138 is_deleted=occurrence.is_deleted, 

139 title=event.title, 

140 slug=event.slug, 

141 content=occurrence.content, 

142 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

143 online_information=( 

144 events_pb2.OnlineEventInformation( 

145 link=occurrence.link, 

146 ) 

147 if occurrence.link 

148 else None 

149 ), 

150 offline_information=( 

151 events_pb2.OfflineEventInformation( 

152 lat=occurrence.coordinates[0], 

153 lng=occurrence.coordinates[1], 

154 address=occurrence.address, 

155 ) 

156 if occurrence.geom 

157 else None 

158 ), 

159 created=Timestamp_from_datetime(occurrence.created), 

160 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

161 creator_user_id=occurrence.creator_user_id, 

162 start_time=Timestamp_from_datetime(occurrence.start_time), 

163 end_time=Timestamp_from_datetime(occurrence.end_time), 

164 timezone=occurrence.timezone, 

165 start_time_display=str(occurrence.start_time), 

166 end_time_display=str(occurrence.end_time), 

167 attendance_state=attendancestate2api[attendance_state], 

168 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

169 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

170 going_count=going_count, 

171 maybe_count=maybe_count, 

172 organizer_count=organizer_count, 

173 subscriber_count=subscriber_count, 

174 owner_user_id=event.owner_user_id, 

175 owner_community_id=owner_community_id, 

176 owner_group_id=owner_group_id, 

177 thread=thread_to_pb(session, event.thread_id), 

178 can_edit=can_edit, 

179 can_moderate=can_moderate, 

180 ) 

181 

182 

183def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool): 

184 query = ( 

185 select(Event, EventOccurrence) 

186 .where(EventOccurrence.id == occurrence_id) 

187 .where(EventOccurrence.event_id == Event.id) 

188 ) 

189 

190 if not include_deleted: 

191 query = query.where(~EventOccurrence.is_deleted) 

192 

193 return query 

194 

195 

196def _get_event_and_occurrence_one( 

197 session, occurrence_id, include_deleted: bool = False 

198) -> tuple[Event, EventOccurrence]: 

199 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one() 

200 

201 

202def _get_event_and_occurrence_one_or_none( 

203 session, occurrence_id, include_deleted: bool = False 

204) -> tuple[Event, EventOccurrence] | None: 

205 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none() 

206 

207 

208def _check_occurrence_time_validity(start_time, end_time, context): 

209 if start_time < now(): 

210 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST) 

211 if end_time < start_time: 

212 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS) 

213 if end_time - start_time > timedelta(days=7): 

214 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG) 

215 if start_time - now() > timedelta(days=365): 

216 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE) 

217 

218 

219def get_users_to_notify_for_new_event(session, occurrence): 

220 """ 

221 Returns the users to notify, as well as the community id that is being notified (None if based on geo search) 

222 """ 

223 cluster = occurrence.event.parent_node.official_cluster 

224 if cluster.parent_node_id == 1: 

225 logger.info("The Global Community is too big for email notifications.") 

226 return [], occurrence.event.parent_node_id 

227 elif occurrence.creator_user in cluster.admins or cluster.is_leaf: 

228 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id 

229 else: 

230 max_radius = 20000 # m 

231 users = ( 

232 session.execute( 

233 select(User) 

234 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

235 .where(User.is_visible) 

236 .where(ClusterSubscription.cluster_id == cluster.id) 

237 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111)) 

238 ) 

239 .scalars() 

240 .all() 

241 ) 

242 return users, None 

243 

244 

245def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload): 

246 """ 

247 Background job to generated/fan out event notifications 

248 """ 

249 from couchers.servicers.communities import community_to_pb 

250 

251 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}") 

252 

253 with session_scope() as session: 

254 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

255 creator = occurrence.creator_user 

256 

257 users, node_id = get_users_to_notify_for_new_event(session, occurrence) 

258 

259 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none() 

260 

261 if not inviting_user: 

262 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?") 

263 return 

264 

265 for user in users: 

266 if are_blocked(session, user.id, creator.id): 

267 continue 

268 context = SimpleNamespace(user_id=user.id) 

269 notify( 

270 session, 

271 user_id=user.id, 

272 topic_action="event:create_approved" if payload.approved else "event:create_any", 

273 key=payload.occurrence_id, 

274 data=notification_data_pb2.EventCreate( 

275 event=event_to_pb(session, occurrence, context), 

276 inviting_user=user_model_to_pb(inviting_user, session, context), 

277 nearby=True if node_id is None else None, 

278 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None, 

279 ), 

280 ) 

281 

282 

283def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload): 

284 with session_scope() as session: 

285 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

286 

287 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none() 

288 

289 subscribed_user_ids = [user.id for user in event.subscribers] 

290 attending_user_ids = [user.user_id for user in occurrence.attendances] 

291 

292 for user_id in set(subscribed_user_ids + attending_user_ids): 

293 logger.info(user_id) 

294 if are_blocked(session, user_id, updating_user.id): 

295 continue 

296 context = SimpleNamespace(user_id=user_id) 

297 notify( 

298 session, 

299 user_id=user_id, 

300 topic_action="event:update", 

301 key=payload.occurrence_id, 

302 data=notification_data_pb2.EventUpdate( 

303 event=event_to_pb(session, occurrence, context), 

304 updating_user=user_model_to_pb(updating_user, session, context), 

305 updated_items=payload.updated_items, 

306 ), 

307 ) 

308 

309 

310def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload): 

311 with session_scope() as session: 

312 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

313 

314 cancelling_user = session.execute( 

315 select(User).where(User.id == payload.cancelling_user_id) 

316 ).scalar_one_or_none() 

317 

318 subscribed_user_ids = [user.id for user in event.subscribers] 

319 attending_user_ids = [user.user_id for user in occurrence.attendances] 

320 

321 for user_id in set(subscribed_user_ids + attending_user_ids): 

322 logger.info(user_id) 

323 if are_blocked(session, user_id, cancelling_user.id): 

324 continue 

325 context = SimpleNamespace(user_id=user_id) 

326 notify( 

327 session, 

328 user_id=user_id, 

329 topic_action="event:cancel", 

330 key=payload.occurrence_id, 

331 data=notification_data_pb2.EventCancel( 

332 event=event_to_pb(session, occurrence, context), 

333 cancelling_user=user_model_to_pb(cancelling_user, session, context), 

334 ), 

335 ) 

336 

337 

338def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload): 

339 with session_scope() as session: 

340 event, occurrence = _get_event_and_occurrence_one( 

341 session, occurrence_id=payload.occurrence_id, include_deleted=True 

342 ) 

343 

344 subscribed_user_ids = [user.id for user in event.subscribers] 

345 attending_user_ids = [user.user_id for user in occurrence.attendances] 

346 

347 for user_id in set(subscribed_user_ids + attending_user_ids): 

348 logger.info(user_id) 

349 context = SimpleNamespace(user_id=user_id) 

350 notify( 

351 session, 

352 user_id=user_id, 

353 topic_action="event:delete", 

354 key=payload.occurrence_id, 

355 data=notification_data_pb2.EventDelete( 

356 event=event_to_pb(session, occurrence, context), 

357 ), 

358 ) 

359 

360 

361class Events(events_pb2_grpc.EventsServicer): 

362 def CreateEvent(self, request, context, session): 

363 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

364 if not user.has_completed_profile: 

365 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT) 

366 if not request.title: 

367 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE) 

368 if not request.content: 

369 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

370 if request.HasField("online_information"): 

371 online = True 

372 geom = None 

373 address = None 

374 if not request.online_information.link: 

375 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

376 link = request.online_information.link 

377 elif request.HasField("offline_information"): 

378 online = False 

379 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value 

380 if not ( 

381 request.offline_information.address 

382 and request.offline_information.lat 

383 and request.offline_information.lng 

384 ): 

385 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

386 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

387 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

388 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

389 address = request.offline_information.address 

390 link = None 

391 else: 

392 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

393 

394 start_time = to_aware_datetime(request.start_time) 

395 end_time = to_aware_datetime(request.end_time) 

396 

397 _check_occurrence_time_validity(start_time, end_time, context) 

398 

399 if request.parent_community_id: 

400 parent_node = session.execute( 

401 select(Node).where(Node.id == request.parent_community_id) 

402 ).scalar_one_or_none() 

403 else: 

404 if online: 

405 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY) 

406 # parent community computed from geom 

407 parent_node = get_parent_node_at_location(session, geom) 

408 

409 if not parent_node: 

410 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

411 

412 if ( 

413 request.photo_key 

414 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

415 ): 

416 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

417 

418 event = Event( 

419 title=request.title, 

420 parent_node_id=parent_node.id, 

421 owner_user_id=context.user_id, 

422 thread=Thread(), 

423 creator_user_id=context.user_id, 

424 ) 

425 session.add(event) 

426 

427 occurrence = EventOccurrence( 

428 event=event, 

429 content=request.content, 

430 geom=geom, 

431 address=address, 

432 link=link, 

433 photo_key=request.photo_key if request.photo_key != "" else None, 

434 # timezone=timezone, 

435 during=DateTimeTZRange(start_time, end_time), 

436 creator_user_id=context.user_id, 

437 ) 

438 session.add(occurrence) 

439 

440 session.add( 

441 EventOrganizer( 

442 user_id=context.user_id, 

443 event=event, 

444 ) 

445 ) 

446 

447 session.add( 

448 EventSubscription( 

449 user_id=context.user_id, 

450 event=event, 

451 ) 

452 ) 

453 

454 session.add( 

455 EventOccurrenceAttendee( 

456 user_id=context.user_id, 

457 occurrence=occurrence, 

458 attendee_status=AttendeeStatus.going, 

459 ) 

460 ) 

461 

462 session.commit() 

463 

464 if user.has_completed_profile: 

465 queue_job( 

466 session, 

467 "generate_event_create_notifications", 

468 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

469 inviting_user_id=user.id, 

470 occurrence_id=occurrence.id, 

471 approved=False, 

472 ), 

473 ) 

474 

475 return event_to_pb(session, occurrence, context) 

476 

477 def ScheduleEvent(self, request, context, session): 

478 if not request.content: 

479 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

480 if request.HasField("online_information"): 

481 geom = None 

482 address = None 

483 link = request.online_information.link 

484 elif request.HasField("offline_information"): 

485 if not ( 

486 request.offline_information.address 

487 and request.offline_information.lat 

488 and request.offline_information.lng 

489 ): 

490 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

491 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

492 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

493 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

494 address = request.offline_information.address 

495 link = None 

496 else: 

497 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

498 

499 start_time = to_aware_datetime(request.start_time) 

500 end_time = to_aware_datetime(request.end_time) 

501 

502 _check_occurrence_time_validity(start_time, end_time, context) 

503 

504 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

505 if not res: 

506 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

507 

508 event, occurrence = res 

509 

510 if not _can_edit_event(session, event, context.user_id): 

511 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

512 

513 if occurrence.is_cancelled: 

514 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

515 

516 if ( 

517 request.photo_key 

518 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

519 ): 

520 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

521 

522 during = DateTimeTZRange(start_time, end_time) 

523 

524 # && is the overlap operator for ranges 

525 if ( 

526 session.execute( 

527 select(EventOccurrence.id) 

528 .where(EventOccurrence.event_id == event.id) 

529 .where(EventOccurrence.during.op("&&")(during)) 

530 ) 

531 .scalars() 

532 .first() 

533 is not None 

534 ): 

535 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

536 

537 occurrence = EventOccurrence( 

538 event=event, 

539 content=request.content, 

540 geom=geom, 

541 address=address, 

542 link=link, 

543 photo_key=request.photo_key if request.photo_key != "" else None, 

544 # timezone=timezone, 

545 during=during, 

546 creator_user_id=context.user_id, 

547 ) 

548 session.add(occurrence) 

549 

550 session.add( 

551 EventOccurrenceAttendee( 

552 user_id=context.user_id, 

553 occurrence=occurrence, 

554 attendee_status=AttendeeStatus.going, 

555 ) 

556 ) 

557 

558 session.flush() 

559 

560 # TODO: notify 

561 

562 return event_to_pb(session, occurrence, context) 

563 

564 def UpdateEvent(self, request, context, session): 

565 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

566 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

567 if not res: 

568 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

569 

570 event, occurrence = res 

571 

572 if not _can_edit_event(session, event, context.user_id): 

573 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

574 

575 # the things that were updated and need to be notified about 

576 notify_updated = [] 

577 

578 if occurrence.is_cancelled: 

579 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

580 

581 occurrence_update = {"last_edited": now()} 

582 

583 if request.HasField("title"): 

584 notify_updated.append("title") 

585 event.title = request.title.value 

586 event.last_edited = now() 

587 

588 if request.HasField("content"): 

589 notify_updated.append("content") 

590 occurrence_update["content"] = request.content.value 

591 

592 if request.HasField("photo_key"): 

593 occurrence_update["photo_key"] = request.photo_key.value 

594 

595 if request.HasField("online_information"): 

596 notify_updated.append("location") 

597 if not request.online_information.link: 

598 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

599 occurrence_update["link"] = request.online_information.link 

600 occurrence_update["geom"] = None 

601 occurrence_update["address"] = None 

602 elif request.HasField("offline_information"): 

603 notify_updated.append("location") 

604 occurrence_update["link"] = None 

605 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

606 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

607 occurrence_update["geom"] = create_coordinate( 

608 request.offline_information.lat, request.offline_information.lng 

609 ) 

610 occurrence_update["address"] = request.offline_information.address 

611 

612 if request.HasField("start_time") or request.HasField("end_time"): 

613 if request.update_all_future: 

614 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES) 

615 if request.HasField("start_time"): 

616 notify_updated.append("start time") 

617 start_time = to_aware_datetime(request.start_time) 

618 else: 

619 start_time = occurrence.start_time 

620 if request.HasField("end_time"): 

621 notify_updated.append("end time") 

622 end_time = to_aware_datetime(request.end_time) 

623 else: 

624 end_time = occurrence.end_time 

625 

626 _check_occurrence_time_validity(start_time, end_time, context) 

627 

628 during = DateTimeTZRange(start_time, end_time) 

629 

630 # && is the overlap operator for ranges 

631 if ( 

632 session.execute( 

633 select(EventOccurrence.id) 

634 .where(EventOccurrence.event_id == event.id) 

635 .where(EventOccurrence.id != occurrence.id) 

636 .where(EventOccurrence.during.op("&&")(during)) 

637 ) 

638 .scalars() 

639 .first() 

640 is not None 

641 ): 

642 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

643 

644 occurrence_update["during"] = during 

645 

646 # TODO 

647 # if request.HasField("timezone"): 

648 # occurrence_update["timezone"] = request.timezone 

649 

650 # allow editing any event which hasn't ended more than 24 hours before now 

651 # when editing all future events, we edit all which have not yet ended 

652 

653 if request.update_all_future: 

654 session.execute( 

655 update(EventOccurrence) 

656 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

657 .where(EventOccurrence.start_time >= occurrence.start_time) 

658 .values(occurrence_update) 

659 .execution_options(synchronize_session=False) 

660 ) 

661 else: 

662 if occurrence.end_time < now() - timedelta(hours=24): 

663 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

664 session.execute( 

665 update(EventOccurrence) 

666 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

667 .where(EventOccurrence.id == occurrence.id) 

668 .values(occurrence_update) 

669 .execution_options(synchronize_session=False) 

670 ) 

671 

672 session.flush() 

673 

674 if notify_updated: 

675 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying") 

676 

677 queue_job( 

678 session, 

679 "generate_event_update_notifications", 

680 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload( 

681 updating_user_id=user.id, 

682 occurrence_id=occurrence.id, 

683 updated_items=notify_updated, 

684 ), 

685 ) 

686 

687 # since we have synchronize_session=False, we have to refresh the object 

688 session.refresh(occurrence) 

689 

690 return event_to_pb(session, occurrence, context) 

691 

692 def GetEvent(self, request, context, session): 

693 occurrence = session.execute( 

694 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

695 ).scalar_one_or_none() 

696 

697 if not occurrence: 

698 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

699 

700 return event_to_pb(session, occurrence, context) 

701 

702 def CancelEvent(self, request, context, session): 

703 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

704 if not res: 

705 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

706 

707 event, occurrence = res 

708 

709 if not _can_edit_event(session, event, context.user_id): 

710 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

711 

712 if occurrence.end_time < now() - timedelta(hours=24): 

713 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT) 

714 

715 occurrence.is_cancelled = True 

716 

717 queue_job( 

718 session, 

719 "generate_event_cancel_notifications", 

720 payload=jobs_pb2.GenerateEventCancelNotificationsPayload( 

721 cancelling_user_id=context.user_id, 

722 occurrence_id=occurrence.id, 

723 ), 

724 ) 

725 

726 return empty_pb2.Empty() 

727 

728 def RequestCommunityInvite(self, request, context, session): 

729 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

730 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

731 if not res: 

732 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

733 

734 event, occurrence = res 

735 

736 if not _can_edit_event(session, event, context.user_id): 

737 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

738 

739 if occurrence.is_cancelled: 

740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

741 

742 if occurrence.end_time < now() - timedelta(hours=24): 

743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

744 

745 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id] 

746 

747 if len(this_user_reqs) > 0: 

748 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED) 

749 

750 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved] 

751 

752 if len(approved_reqs) > 0: 

753 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED) 

754 

755 request = EventCommunityInviteRequest( 

756 occurrence_id=request.event_id, 

757 user_id=context.user_id, 

758 ) 

759 session.add(request) 

760 session.flush() 

761 

762 send_event_community_invite_request_email(session, request) 

763 

764 return empty_pb2.Empty() 

765 

766 def ListEventOccurrences(self, request, context, session): 

767 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

768 # the page token is a unix timestamp of where we left off 

769 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

770 occurrence = session.execute( 

771 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

772 ).scalar_one_or_none() 

773 if not occurrence: 

774 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

775 

776 occurrences = ( 

777 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted) 

778 ) 

779 

780 if not request.include_cancelled: 

781 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

782 

783 if not request.past: 

784 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

785 EventOccurrence.start_time.asc() 

786 ) 

787 else: 

788 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

789 EventOccurrence.start_time.desc() 

790 ) 

791 

792 occurrences = occurrences.limit(page_size + 1) 

793 occurrences = session.execute(occurrences).scalars().all() 

794 

795 return events_pb2.ListEventOccurrencesRes( 

796 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

797 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

798 ) 

799 

800 def ListEventAttendees(self, request, context, session): 

801 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

802 next_user_id = int(request.page_token) if request.page_token else 0 

803 occurrence = session.execute( 

804 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

805 ).scalar_one_or_none() 

806 if not occurrence: 

807 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

808 attendees = ( 

809 session.execute( 

810 select(EventOccurrenceAttendee) 

811 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

812 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

813 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

814 .order_by(EventOccurrenceAttendee.user_id) 

815 .limit(page_size + 1) 

816 ) 

817 .scalars() 

818 .all() 

819 ) 

820 return events_pb2.ListEventAttendeesRes( 

821 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

822 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

823 ) 

824 

825 def ListEventSubscribers(self, request, context, session): 

826 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

827 next_user_id = int(request.page_token) if request.page_token else 0 

828 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

829 if not res: 

830 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

831 event, occurrence = res 

832 subscribers = ( 

833 session.execute( 

834 select(EventSubscription) 

835 .where_users_column_visible(context, EventSubscription.user_id) 

836 .where(EventSubscription.event_id == event.id) 

837 .where(EventSubscription.user_id >= next_user_id) 

838 .order_by(EventSubscription.user_id) 

839 .limit(page_size + 1) 

840 ) 

841 .scalars() 

842 .all() 

843 ) 

844 return events_pb2.ListEventSubscribersRes( 

845 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

846 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

847 ) 

848 

849 def ListEventOrganizers(self, request, context, session): 

850 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

851 next_user_id = int(request.page_token) if request.page_token else 0 

852 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

853 if not res: 

854 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

855 event, occurrence = res 

856 organizers = ( 

857 session.execute( 

858 select(EventOrganizer) 

859 .where_users_column_visible(context, EventOrganizer.user_id) 

860 .where(EventOrganizer.event_id == event.id) 

861 .where(EventOrganizer.user_id >= next_user_id) 

862 .order_by(EventOrganizer.user_id) 

863 .limit(page_size + 1) 

864 ) 

865 .scalars() 

866 .all() 

867 ) 

868 return events_pb2.ListEventOrganizersRes( 

869 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

870 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

871 ) 

872 

873 def TransferEvent(self, request, context, session): 

874 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

875 if not res: 

876 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

877 

878 event, occurrence = res 

879 

880 if not _can_edit_event(session, event, context.user_id): 

881 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED) 

882 

883 if occurrence.is_cancelled: 

884 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

885 

886 if occurrence.end_time < now() - timedelta(hours=24): 

887 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

888 

889 if request.WhichOneof("new_owner") == "new_owner_group_id": 

890 cluster = session.execute( 

891 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

892 ).scalar_one_or_none() 

893 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

894 cluster = session.execute( 

895 select(Cluster) 

896 .where(Cluster.parent_node_id == request.new_owner_community_id) 

897 .where(Cluster.is_official_cluster) 

898 ).scalar_one_or_none() 

899 

900 if not cluster: 

901 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

902 

903 event.owner_user = None 

904 event.owner_cluster = cluster 

905 

906 session.commit() 

907 return event_to_pb(session, occurrence, context) 

908 

909 def SetEventSubscription(self, request, context, session): 

910 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

911 if not res: 

912 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

913 

914 event, occurrence = res 

915 

916 if occurrence.is_cancelled: 

917 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

918 

919 if occurrence.end_time < now() - timedelta(hours=24): 

920 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

921 

922 current_subscription = session.execute( 

923 select(EventSubscription) 

924 .where(EventSubscription.user_id == context.user_id) 

925 .where(EventSubscription.event_id == event.id) 

926 ).scalar_one_or_none() 

927 

928 # if not subscribed, subscribe 

929 if request.subscribe and not current_subscription: 

930 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

931 

932 # if subscribed but unsubbing, remove subscription 

933 if not request.subscribe and current_subscription: 

934 session.delete(current_subscription) 

935 

936 session.flush() 

937 

938 return event_to_pb(session, occurrence, context) 

939 

940 def SetEventAttendance(self, request, context, session): 

941 occurrence = session.execute( 

942 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

943 ).scalar_one_or_none() 

944 

945 if not occurrence: 

946 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

947 

948 if occurrence.is_cancelled: 

949 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

950 

951 if occurrence.end_time < now() - timedelta(hours=24): 

952 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

953 

954 current_attendance = session.execute( 

955 select(EventOccurrenceAttendee) 

956 .where(EventOccurrenceAttendee.user_id == context.user_id) 

957 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

958 ).scalar_one_or_none() 

959 

960 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

961 if current_attendance: 

962 session.delete(current_attendance) 

963 # if unset/not going, nothing to do! 

964 else: 

965 if current_attendance: 

966 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] 

967 else: 

968 # create new 

969 attendance = EventOccurrenceAttendee( 

970 user_id=context.user_id, 

971 occurrence_id=occurrence.id, 

972 attendee_status=attendancestate2sql[request.attendance_state], 

973 ) 

974 session.add(attendance) 

975 

976 session.flush() 

977 

978 return event_to_pb(session, occurrence, context) 

979 

980 def ListMyEvents(self, request, context, session): 

981 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

982 # the page token is a unix timestamp of where we left off 

983 page_token = ( 

984 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

985 ) 

986 # the page number is the page number we are on 

987 page_number = request.page_number or 1 

988 # Calculate the offset for pagination 

989 offset = (page_number - 1) * page_size 

990 occurrences = ( 

991 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

992 ) 

993 

994 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

995 include_subscribed = request.subscribed or include_all 

996 include_organizing = request.organizing or include_all 

997 include_attending = request.attending or include_all 

998 include_my_communities = request.my_communities or include_all 

999 

1000 where_ = [] 

1001 

1002 if include_subscribed: 

1003 occurrences = occurrences.outerjoin( 

1004 EventSubscription, 

1005 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

1006 ) 

1007 where_.append(EventSubscription.user_id != None) 

1008 if include_organizing: 

1009 occurrences = occurrences.outerjoin( 

1010 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

1011 ) 

1012 where_.append(EventOrganizer.user_id != None) 

1013 if include_attending: 

1014 occurrences = occurrences.outerjoin( 

1015 EventOccurrenceAttendee, 

1016 and_( 

1017 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

1018 EventOccurrenceAttendee.user_id == context.user_id, 

1019 ), 

1020 ) 

1021 where_.append(EventOccurrenceAttendee.user_id != None) 

1022 if include_my_communities: 

1023 my_communities = ( 

1024 session.execute( 

1025 select(Node.id) 

1026 .join(Cluster, Cluster.parent_node_id == Node.id) 

1027 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

1028 .where(ClusterSubscription.user_id == context.user_id) 

1029 .where(Cluster.is_official_cluster) 

1030 .order_by(Node.id) 

1031 .limit(100000) 

1032 ) 

1033 .scalars() 

1034 .all() 

1035 ) 

1036 where_.append(Event.parent_node_id.in_(my_communities)) 

1037 

1038 occurrences = occurrences.where(or_(*where_)) 

1039 

1040 if not request.include_cancelled: 

1041 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

1042 

1043 if not request.past: 

1044 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

1045 EventOccurrence.start_time.asc() 

1046 ) 

1047 else: 

1048 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

1049 EventOccurrence.start_time.desc() 

1050 ) 

1051 # Count the total number of items for pagination 

1052 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar() 

1053 # Apply pagination by page number 

1054 occurrences = ( 

1055 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1) 

1056 ) 

1057 occurrences = session.execute(occurrences).scalars().all() 

1058 

1059 return events_pb2.ListMyEventsRes( 

1060 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1061 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1062 total_items=total_items, 

1063 ) 

1064 

1065 def ListAllEvents(self, request, context, session): 

1066 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1067 # the page token is a unix timestamp of where we left off 

1068 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

1069 

1070 occurrences = ( 

1071 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1072 ) 

1073 

1074 if not request.include_cancelled: 

1075 occurrences = occurrences.where(~EventOccurrence.is_cancelled) 

1076 

1077 if not request.past: 

1078 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

1079 EventOccurrence.start_time.asc() 

1080 ) 

1081 else: 

1082 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

1083 EventOccurrence.start_time.desc() 

1084 ) 

1085 

1086 occurrences = occurrences.limit(page_size + 1) 

1087 occurrences = session.execute(occurrences).scalars().all() 

1088 

1089 return events_pb2.ListAllEventsRes( 

1090 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1091 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1092 ) 

1093 

1094 def InviteEventOrganizer(self, request, context, session): 

1095 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

1096 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1097 if not res: 

1098 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

1099 

1100 event, occurrence = res 

1101 

1102 if not _can_edit_event(session, event, context.user_id): 

1103 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

1104 

1105 if occurrence.is_cancelled: 

1106 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

1107 

1108 if occurrence.end_time < now() - timedelta(hours=24): 

1109 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

1110 

1111 if not session.execute( 

1112 select(User).where_users_visible(context).where(User.id == request.user_id) 

1113 ).scalar_one_or_none(): 

1114 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

1115 

1116 session.add( 

1117 EventOrganizer( 

1118 user_id=request.user_id, 

1119 event=event, 

1120 ) 

1121 ) 

1122 session.flush() 

1123 

1124 other_user_context = SimpleNamespace(user_id=request.user_id) 

1125 

1126 notify( 

1127 session, 

1128 user_id=request.user_id, 

1129 topic_action="event:invite_organizer", 

1130 key=event.id, 

1131 data=notification_data_pb2.EventInviteOrganizer( 

1132 event=event_to_pb(session, occurrence, other_user_context), 

1133 inviting_user=user_model_to_pb(user, session, other_user_context), 

1134 ), 

1135 ) 

1136 

1137 return empty_pb2.Empty() 

1138 

1139 def RemoveEventOrganizer(self, request, context, session): 

1140 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1141 if not res: 

1142 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

1143 

1144 event, occurrence = res 

1145 

1146 if occurrence.is_cancelled: 

1147 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT) 

1148 

1149 if occurrence.end_time < now() - timedelta(hours=24): 

1150 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

1151 

1152 if event.owner_user_id == context.user_id: 

1153 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER) 

1154 

1155 current = session.execute( 

1156 select(EventOrganizer) 

1157 .where(EventOrganizer.user_id == context.user_id) 

1158 .where(EventOrganizer.event_id == event.id) 

1159 ).scalar_one_or_none() 

1160 

1161 if not current: 

1162 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER) 

1163 

1164 session.delete(current) 

1165 

1166 return empty_pb2.Empty()