Coverage for app/backend/src/couchers/servicers/events.py: 84%

552 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2from datetime import datetime, timedelta 

3from typing import Any, cast 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from psycopg.types.range import TimestamptzRange 

8from sqlalchemy import Select, select 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import and_, func, or_, update 

11 

12from couchers.context import CouchersContext, make_notification_user_context 

13from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

14from couchers.event_log import log_event 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.jobs.enqueue import queue_job 

17from couchers.models import ( 

18 AttendeeStatus, 

19 Cluster, 

20 ClusterSubscription, 

21 Event, 

22 EventCommunityInviteRequest, 

23 EventOccurrence, 

24 EventOccurrenceAttendee, 

25 EventOrganizer, 

26 EventSubscription, 

27 ModerationObjectType, 

28 Node, 

29 NodeType, 

30 Thread, 

31 Upload, 

32 User, 

33) 

34from couchers.models.notifications import NotificationTopicAction 

35from couchers.moderation.utils import create_moderation 

36from couchers.notifications.notify import notify 

37from couchers.proto import events_pb2, events_pb2_grpc, notification_data_pb2 

38from couchers.proto.internal import jobs_pb2 

39from couchers.servicers.api import user_model_to_pb 

40from couchers.servicers.blocking import is_not_visible 

41from couchers.servicers.threads import thread_to_pb 

42from couchers.sql import users_visible, where_moderated_content_visible, where_users_column_visible 

43from couchers.tasks import send_event_community_invite_request_email 

44from couchers.utils import ( 

45 Timestamp_from_datetime, 

46 create_coordinate, 

47 dt_from_millis, 

48 millis_from_dt, 

49 not_none, 

50 now, 

51 to_aware_datetime, 

52) 

53 

54logger = logging.getLogger(__name__) 

55 

56attendancestate2sql = { 

57 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

58 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

59} 

60 

61attendancestate2api = { 

62 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

63 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

64} 

65 

66MAX_PAGINATION_LENGTH = 25 

67 

68 

69def _is_event_owner(event: Event, user_id: int) -> bool: 

70 """ 

71 Checks whether the user can act as an owner of the event 

72 """ 

73 if event.owner_user: 

74 return event.owner_user_id == user_id 

75 # otherwise owned by a cluster 

76 return not_none(event.owner_cluster).admins.where(User.id == user_id).one_or_none() is not None 

77 

78 

79def _is_event_organizer(event: Event, user_id: int) -> bool: 

80 """ 

81 Checks whether the user is as an organizer of the event 

82 """ 

83 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None 

84 

85 

86def _can_moderate_event(session: Session, event: Event, user_id: int) -> bool: 

87 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

88 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

89 return True 

90 

91 # finally check if the user can moderate the parent node of the cluster 

92 return can_moderate_node(session, user_id, event.parent_node_id) 

93 

94 

95def _can_edit_event(session: Session, event: Event, user_id: int) -> bool: 

96 return ( 

97 _is_event_owner(event, user_id) 

98 or _is_event_organizer(event, user_id) 

99 or _can_moderate_event(session, event, user_id) 

100 ) 

101 

102 

103def event_to_pb(session: Session, occurrence: EventOccurrence, context: CouchersContext) -> events_pb2.Event: 

104 event = occurrence.event 

105 

106 next_occurrence = ( 

107 event.occurrences.where(EventOccurrence.end_time >= now()) 

108 .order_by(EventOccurrence.end_time.asc()) 

109 .limit(1) 

110 .one_or_none() 

111 ) 

112 

113 owner_community_id = None 

114 owner_group_id = None 

115 if event.owner_cluster: 

116 if event.owner_cluster.is_official_cluster: 

117 owner_community_id = event.owner_cluster.parent_node_id 

118 else: 

119 owner_group_id = event.owner_cluster.id 

120 

121 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

122 attendance_state = attendance.attendee_status if attendance else None 

123 

124 can_moderate = _can_moderate_event(session, event, context.user_id) 

125 can_edit = _can_edit_event(session, event, context.user_id) 

126 

127 going_count = session.execute( 

128 where_users_column_visible( 

129 select(func.count()) 

130 .select_from(EventOccurrenceAttendee) 

131 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

132 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going), 

133 context, 

134 EventOccurrenceAttendee.user_id, 

135 ) 

136 ).scalar_one() 

137 organizer_count = session.execute( 

138 where_users_column_visible( 

139 select(func.count()).select_from(EventOrganizer).where(EventOrganizer.event_id == event.id), 

140 context, 

141 EventOrganizer.user_id, 

142 ) 

143 ).scalar_one() 

144 subscriber_count = session.execute( 

145 where_users_column_visible( 

146 select(func.count()).select_from(EventSubscription).where(EventSubscription.event_id == event.id), 

147 context, 

148 EventSubscription.user_id, 

149 ) 

150 ).scalar_one() 

151 

152 return events_pb2.Event( 

153 event_id=occurrence.id, 

154 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

155 is_cancelled=occurrence.is_cancelled, 

156 is_deleted=occurrence.is_deleted, 

157 title=event.title, 

158 slug=event.slug, 

159 content=occurrence.content, 

160 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

161 photo_key=occurrence.photo_key or "", 

162 online_information=( 

163 events_pb2.OnlineEventInformation( 

164 link=occurrence.link, 

165 ) 

166 if occurrence.link 

167 else None 

168 ), 

169 offline_information=( 

170 events_pb2.OfflineEventInformation( 

171 lat=not_none(occurrence.coordinates)[0], 

172 lng=not_none(occurrence.coordinates)[1], 

173 address=occurrence.address, 

174 ) 

175 if occurrence.geom 

176 else None 

177 ), 

178 created=Timestamp_from_datetime(occurrence.created), 

179 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

180 creator_user_id=occurrence.creator_user_id, 

181 start_time=Timestamp_from_datetime(occurrence.start_time), 

182 end_time=Timestamp_from_datetime(occurrence.end_time), 

183 timezone=occurrence.timezone, 

184 attendance_state=attendancestate2api[attendance_state], 

185 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

186 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

187 going_count=going_count, 

188 organizer_count=organizer_count, 

189 subscriber_count=subscriber_count, 

190 owner_user_id=event.owner_user_id, 

191 owner_community_id=owner_community_id, 

192 owner_group_id=owner_group_id, 

193 thread=thread_to_pb(session, context, event.thread_id), 

194 can_edit=can_edit, 

195 can_moderate=can_moderate, 

196 ) 

197 

198 

199def _get_event_and_occurrence_query( 

200 occurrence_id: int, 

201 include_deleted: bool, 

202 context: CouchersContext | None = None, 

203) -> Select[tuple[Event, EventOccurrence]]: 

204 query = ( 

205 select(Event, EventOccurrence) 

206 .where(EventOccurrence.id == occurrence_id) 

207 .where(EventOccurrence.event_id == Event.id) 

208 ) 

209 

210 if not include_deleted: 210 ↛ 213line 210 didn't jump to line 213 because the condition on line 210 was always true

211 query = query.where(~EventOccurrence.is_deleted) 

212 

213 if context is not None: 

214 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=False) 

215 

216 return query 

217 

218 

219def _get_event_and_occurrence_one( 

220 session: Session, occurrence_id: int, include_deleted: bool = False 

221) -> tuple[Event, EventOccurrence]: 

222 """For background jobs only - no visibility filtering.""" 

223 result = session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one() 

224 return result._tuple() 

225 

226 

227def _get_event_and_occurrence_one_or_none( 

228 session: Session, occurrence_id: int, context: CouchersContext, include_deleted: bool = False 

229) -> tuple[Event, EventOccurrence] | None: 

230 result = session.execute( 

231 _get_event_and_occurrence_query(occurrence_id, include_deleted, context=context) 

232 ).one_or_none() 

233 return result._tuple() if result else None 

234 

235 

236def _check_occurrence_time_validity(start_time: datetime, end_time: datetime, context: CouchersContext) -> None: 

237 if start_time < now(): 

238 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_in_past") 

239 if end_time < start_time: 

240 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_ends_before_starts") 

241 if end_time - start_time > timedelta(days=7): 

242 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_long") 

243 if start_time - now() > timedelta(days=365): 

244 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_far_in_future") 

245 

246 

247def get_users_to_notify_for_new_event(session: Session, occurrence: EventOccurrence) -> tuple[list[User], int | None]: 

248 """ 

249 Returns the users to notify, as well as the community id that is being notified (None if based on geo search) 

250 """ 

251 cluster = occurrence.event.parent_node.official_cluster 

252 if occurrence.event.parent_node.node_type.value <= NodeType.region.value: 

253 logger.info("Global, macroregion, and region communities are too big for email notifications.") 

254 return [], occurrence.event.parent_node_id 

255 elif occurrence.creator_user in cluster.admins or cluster.is_leaf: 255 ↛ 258line 255 didn't jump to line 258 because the condition on line 255 was always true

256 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id 

257 else: 

258 max_radius = 20000 # m 

259 users = ( 

260 session.execute( 

261 select(User) 

262 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

263 .where(User.is_visible) 

264 .where(ClusterSubscription.cluster_id == cluster.id) 

265 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111)) 

266 ) 

267 .scalars() 

268 .all() 

269 ) 

270 return cast(tuple[list[User], int | None], (users, None)) 

271 

272 

273def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload) -> None: 

274 """ 

275 Background job to generated/fan out event notifications 

276 """ 

277 # Import here to avoid circular dependency 

278 from couchers.servicers.communities import community_to_pb # noqa: PLC0415 

279 

280 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}") 

281 

282 with session_scope() as session: 

283 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

284 creator = occurrence.creator_user 

285 

286 users, node_id = get_users_to_notify_for_new_event(session, occurrence) 

287 

288 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none() 

289 

290 if not inviting_user: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?") 

292 return 

293 

294 for user in users: 

295 if is_not_visible(session, user.id, creator.id): 295 ↛ 296line 295 didn't jump to line 296 because the condition on line 295 was never true

296 continue 

297 context = make_notification_user_context(user_id=user.id) 

298 topic_action = ( 

299 NotificationTopicAction.event__create_approved 

300 if payload.approved 

301 else NotificationTopicAction.event__create_any 

302 ) 

303 notify( 

304 session, 

305 user_id=user.id, 

306 topic_action=topic_action, 

307 key=str(payload.occurrence_id), 

308 data=notification_data_pb2.EventCreate( 

309 event=event_to_pb(session, occurrence, context), 

310 inviting_user=user_model_to_pb(inviting_user, session, context), 

311 nearby=True if node_id is None else None, 

312 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None, 

313 ), 

314 moderation_state_id=occurrence.moderation_state_id, 

315 ) 

316 

317 

318def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload) -> None: 

319 with session_scope() as session: 

320 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

321 

322 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one() 

323 

324 subscribed_user_ids = [user.id for user in event.subscribers] 

325 attending_user_ids = [user.user_id for user in occurrence.attendances] 

326 

327 for user_id in set(subscribed_user_ids + attending_user_ids): 

328 if is_not_visible(session, user_id, updating_user.id): 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true

329 continue 

330 context = make_notification_user_context(user_id=user_id) 

331 notify( 

332 session, 

333 user_id=user_id, 

334 topic_action=NotificationTopicAction.event__update, 

335 key=str(payload.occurrence_id), 

336 data=notification_data_pb2.EventUpdate( 

337 event=event_to_pb(session, occurrence, context), 

338 updating_user=user_model_to_pb(updating_user, session, context), 

339 updated_items=payload.updated_items, 

340 ), 

341 moderation_state_id=occurrence.moderation_state_id, 

342 ) 

343 

344 

345def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload) -> None: 

346 with session_scope() as session: 

347 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

348 

349 cancelling_user = session.execute(select(User).where(User.id == payload.cancelling_user_id)).scalar_one() 

350 

351 subscribed_user_ids = [user.id for user in event.subscribers] 

352 attending_user_ids = [user.user_id for user in occurrence.attendances] 

353 

354 for user_id in set(subscribed_user_ids + attending_user_ids): 

355 if is_not_visible(session, user_id, cancelling_user.id): 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 continue 

357 context = make_notification_user_context(user_id=user_id) 

358 notify( 

359 session, 

360 user_id=user_id, 

361 topic_action=NotificationTopicAction.event__cancel, 

362 key=str(payload.occurrence_id), 

363 data=notification_data_pb2.EventCancel( 

364 event=event_to_pb(session, occurrence, context), 

365 cancelling_user=user_model_to_pb(cancelling_user, session, context), 

366 ), 

367 moderation_state_id=occurrence.moderation_state_id, 

368 ) 

369 

370 

371def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload) -> None: 

372 with session_scope() as session: 

373 event, occurrence = _get_event_and_occurrence_one( 

374 session, occurrence_id=payload.occurrence_id, include_deleted=True 

375 ) 

376 

377 subscribed_user_ids = [user.id for user in event.subscribers] 

378 attending_user_ids = [user.user_id for user in occurrence.attendances] 

379 

380 for user_id in set(subscribed_user_ids + attending_user_ids): 

381 context = make_notification_user_context(user_id=user_id) 

382 notify( 

383 session, 

384 user_id=user_id, 

385 topic_action=NotificationTopicAction.event__delete, 

386 key=str(payload.occurrence_id), 

387 data=notification_data_pb2.EventDelete( 

388 event=event_to_pb(session, occurrence, context), 

389 ), 

390 moderation_state_id=occurrence.moderation_state_id, 

391 ) 

392 

393 

394class Events(events_pb2_grpc.EventsServicer): 

395 def CreateEvent( 

396 self, request: events_pb2.CreateEventReq, context: CouchersContext, session: Session 

397 ) -> events_pb2.Event: 

398 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

399 if not has_completed_profile(session, user): 

400 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_create_event") 

401 if not request.title: 

402 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_title") 

403 if not request.content: 

404 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

405 if request.HasField("online_information"): 

406 online = True 

407 geom = None 

408 address = None 

409 if not request.online_information.link: 

410 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

411 link = request.online_information.link 

412 elif request.HasField("offline_information"): 412 ↛ 427line 412 didn't jump to line 427 because the condition on line 412 was always true

413 online = False 

414 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value 

415 if not ( 

416 request.offline_information.address 

417 and request.offline_information.lat 

418 and request.offline_information.lng 

419 ): 

420 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

421 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

423 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

424 address = request.offline_information.address 

425 link = None 

426 else: 

427 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

428 

429 start_time = to_aware_datetime(request.start_time) 

430 end_time = to_aware_datetime(request.end_time) 

431 

432 _check_occurrence_time_validity(start_time, end_time, context) 

433 

434 if request.parent_community_id: 

435 parent_node = session.execute( 

436 select(Node).where(Node.id == request.parent_community_id) 

437 ).scalar_one_or_none() 

438 

439 if not parent_node or not parent_node.official_cluster.small_community_features_enabled: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

441 else: 

442 if online: 

443 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_missing_parent_community") 

444 # parent community computed from geom 

445 parent_node = get_parent_node_at_location(session, not_none(geom)) 

446 

447 if not parent_node: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true

448 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "community_not_found") 

449 

450 if ( 

451 request.photo_key 

452 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

453 ): 

454 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

455 

456 thread = Thread() 

457 session.add(thread) 

458 session.flush() 

459 

460 event = Event( 

461 title=request.title, 

462 parent_node_id=parent_node.id, 

463 owner_user_id=context.user_id, 

464 thread_id=thread.id, 

465 creator_user_id=context.user_id, 

466 ) 

467 session.add(event) 

468 session.flush() 

469 

470 occurrence: EventOccurrence | None = None 

471 

472 def create_occurrence(moderation_state_id: int) -> int: 

473 nonlocal occurrence 

474 occurrence = EventOccurrence( 

475 event_id=event.id, 

476 content=request.content, 

477 geom=geom, 

478 address=address, 

479 link=link, 

480 photo_key=request.photo_key if request.photo_key != "" else None, 

481 # timezone=timezone, 

482 during=TimestamptzRange(start_time, end_time), 

483 creator_user_id=context.user_id, 

484 moderation_state_id=moderation_state_id, 

485 ) 

486 session.add(occurrence) 

487 session.flush() 

488 return occurrence.id 

489 

490 create_moderation( 

491 session=session, 

492 object_type=ModerationObjectType.event_occurrence, 

493 object_id=create_occurrence, 

494 creator_user_id=context.user_id, 

495 ) 

496 

497 assert occurrence is not None 

498 

499 session.add( 

500 EventOrganizer( 

501 user_id=context.user_id, 

502 event_id=event.id, 

503 ) 

504 ) 

505 

506 session.add( 

507 EventSubscription( 

508 user_id=context.user_id, 

509 event_id=event.id, 

510 ) 

511 ) 

512 

513 session.add( 

514 EventOccurrenceAttendee( 

515 user_id=context.user_id, 

516 occurrence_id=occurrence.id, 

517 attendee_status=AttendeeStatus.going, 

518 ) 

519 ) 

520 

521 session.commit() 

522 

523 log_event( 

524 context, 

525 session, 

526 "event.created", 

527 { 

528 "event_id": event.id, 

529 "occurrence_id": occurrence.id, 

530 "parent_community_id": parent_node.id, 

531 "parent_community_name": parent_node.official_cluster.name, 

532 "online": online, 

533 }, 

534 ) 

535 

536 if has_completed_profile(session, user): 536 ↛ 547line 536 didn't jump to line 547 because the condition on line 536 was always true

537 queue_job( 

538 session, 

539 job=generate_event_create_notifications, 

540 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

541 inviting_user_id=user.id, 

542 occurrence_id=occurrence.id, 

543 approved=False, 

544 ), 

545 ) 

546 

547 return event_to_pb(session, occurrence, context) 

548 

549 def ScheduleEvent( 

550 self, request: events_pb2.ScheduleEventReq, context: CouchersContext, session: Session 

551 ) -> events_pb2.Event: 

552 if not request.content: 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

554 if request.HasField("online_information"): 

555 geom = None 

556 address = None 

557 link = request.online_information.link 

558 elif request.HasField("offline_information"): 558 ↛ 571line 558 didn't jump to line 571 because the condition on line 558 was always true

559 if not ( 559 ↛ 564line 559 didn't jump to line 564 because the condition on line 559 was never true

560 request.offline_information.address 

561 and request.offline_information.lat 

562 and request.offline_information.lng 

563 ): 

564 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

565 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

567 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

568 address = request.offline_information.address 

569 link = None 

570 else: 

571 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

572 

573 start_time = to_aware_datetime(request.start_time) 

574 end_time = to_aware_datetime(request.end_time) 

575 

576 _check_occurrence_time_validity(start_time, end_time, context) 

577 

578 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

579 if not res: 579 ↛ 580line 579 didn't jump to line 580 because the condition on line 579 was never true

580 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

581 

582 event, occurrence = res 

583 

584 if not _can_edit_event(session, event, context.user_id): 584 ↛ 585line 584 didn't jump to line 585 because the condition on line 584 was never true

585 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

586 

587 if occurrence.is_cancelled: 587 ↛ 588line 587 didn't jump to line 588 because the condition on line 587 was never true

588 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

589 

590 if ( 590 ↛ 594line 590 didn't jump to line 594 because the condition on line 590 was never true

591 request.photo_key 

592 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

593 ): 

594 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

595 

596 during = TimestamptzRange(start_time, end_time) 

597 

598 # && is the overlap operator for ranges 

599 if ( 

600 session.execute( 

601 select(EventOccurrence.id) 

602 .where(EventOccurrence.event_id == event.id) 

603 .where(EventOccurrence.during.op("&&")(during)) 

604 .limit(1) 

605 ) 

606 .scalars() 

607 .one_or_none() 

608 is not None 

609 ): 

610 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

611 

612 new_occurrence: EventOccurrence | None = None 

613 

614 def create_occurrence(moderation_state_id: int) -> int: 

615 nonlocal new_occurrence 

616 new_occurrence = EventOccurrence( 

617 event_id=event.id, 

618 content=request.content, 

619 geom=geom, 

620 address=address, 

621 link=link, 

622 photo_key=request.photo_key if request.photo_key != "" else None, 

623 # timezone=timezone, 

624 during=during, 

625 creator_user_id=context.user_id, 

626 moderation_state_id=moderation_state_id, 

627 ) 

628 session.add(new_occurrence) 

629 session.flush() 

630 return new_occurrence.id 

631 

632 create_moderation( 

633 session=session, 

634 object_type=ModerationObjectType.event_occurrence, 

635 object_id=create_occurrence, 

636 creator_user_id=context.user_id, 

637 ) 

638 

639 assert new_occurrence is not None 

640 

641 session.add( 

642 EventOccurrenceAttendee( 

643 user_id=context.user_id, 

644 occurrence_id=new_occurrence.id, 

645 attendee_status=AttendeeStatus.going, 

646 ) 

647 ) 

648 

649 session.flush() 

650 

651 # TODO: notify 

652 

653 return event_to_pb(session, new_occurrence, context) 

654 

655 def UpdateEvent( 

656 self, request: events_pb2.UpdateEventReq, context: CouchersContext, session: Session 

657 ) -> events_pb2.Event: 

658 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

659 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

660 if not res: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

662 

663 event, occurrence = res 

664 

665 if not _can_edit_event(session, event, context.user_id): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

667 

668 # the things that were updated and need to be notified about 

669 notify_updated = [] 

670 

671 if occurrence.is_cancelled: 

672 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

673 

674 occurrence_update: dict[str, Any] = {"last_edited": now()} 

675 

676 if request.HasField("title"): 

677 notify_updated.append("title") 

678 event.title = request.title.value 

679 

680 if request.HasField("content"): 

681 notify_updated.append("content") 

682 occurrence_update["content"] = request.content.value 

683 

684 if request.HasField("photo_key"): 684 ↛ 685line 684 didn't jump to line 685 because the condition on line 684 was never true

685 occurrence_update["photo_key"] = request.photo_key.value 

686 

687 if request.HasField("online_information"): 

688 notify_updated.append("location") 

689 if not request.online_information.link: 689 ↛ 690line 689 didn't jump to line 690 because the condition on line 689 was never true

690 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

691 occurrence_update["link"] = request.online_information.link 

692 occurrence_update["geom"] = None 

693 occurrence_update["address"] = None 

694 elif request.HasField("offline_information"): 

695 notify_updated.append("location") 

696 occurrence_update["link"] = None 

697 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

699 occurrence_update["geom"] = create_coordinate( 

700 request.offline_information.lat, request.offline_information.lng 

701 ) 

702 occurrence_update["address"] = request.offline_information.address 

703 

704 if request.HasField("start_time") or request.HasField("end_time"): 

705 if request.update_all_future: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true

706 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_cant_update_all_times") 

707 if request.HasField("start_time"): 707 ↛ 711line 707 didn't jump to line 711 because the condition on line 707 was always true

708 notify_updated.append("start time") 

709 start_time = to_aware_datetime(request.start_time) 

710 else: 

711 start_time = occurrence.start_time 

712 if request.HasField("end_time"): 

713 notify_updated.append("end time") 

714 end_time = to_aware_datetime(request.end_time) 

715 else: 

716 end_time = occurrence.end_time 

717 

718 _check_occurrence_time_validity(start_time, end_time, context) 

719 

720 during = TimestamptzRange(start_time, end_time) 

721 

722 # && is the overlap operator for ranges 

723 if ( 

724 session.execute( 

725 select(EventOccurrence.id) 

726 .where(EventOccurrence.event_id == event.id) 

727 .where(EventOccurrence.id != occurrence.id) 

728 .where(EventOccurrence.during.op("&&")(during)) 

729 .limit(1) 

730 ) 

731 .scalars() 

732 .one_or_none() 

733 is not None 

734 ): 

735 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

736 

737 occurrence_update["during"] = during 

738 

739 # TODO 

740 # if request.HasField("timezone"): 

741 # occurrence_update["timezone"] = request.timezone 

742 

743 # allow editing any event which hasn't ended more than 24 hours before now 

744 # when editing all future events, we edit all which have not yet ended 

745 

746 cutoff_time = now() - timedelta(hours=24) 

747 if request.update_all_future: 

748 session.execute( 

749 update(EventOccurrence) 

750 .where(EventOccurrence.end_time >= cutoff_time) 

751 .where(EventOccurrence.start_time >= occurrence.start_time) 

752 .values(occurrence_update) 

753 .execution_options(synchronize_session=False) 

754 ) 

755 else: 

756 if occurrence.end_time < cutoff_time: 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true

757 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

758 session.execute( 

759 update(EventOccurrence) 

760 .where(EventOccurrence.end_time >= cutoff_time) 

761 .where(EventOccurrence.id == occurrence.id) 

762 .values(occurrence_update) 

763 .execution_options(synchronize_session=False) 

764 ) 

765 

766 session.flush() 

767 

768 if notify_updated: 

769 if request.should_notify: 

770 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying") 

771 

772 queue_job( 

773 session, 

774 job=generate_event_update_notifications, 

775 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload( 

776 updating_user_id=user.id, 

777 occurrence_id=occurrence.id, 

778 updated_items=notify_updated, 

779 ), 

780 ) 

781 else: 

782 logger.info( 

783 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications" 

784 ) 

785 

786 # since we have synchronize_session=False, we have to refresh the object 

787 session.refresh(occurrence) 

788 

789 return event_to_pb(session, occurrence, context) 

790 

791 def GetEvent(self, request: events_pb2.GetEventReq, context: CouchersContext, session: Session) -> events_pb2.Event: 

792 query = select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

793 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=False) 

794 occurrence = session.execute(query).scalar_one_or_none() 

795 

796 if not occurrence: 

797 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

798 

799 return event_to_pb(session, occurrence, context) 

800 

801 def CancelEvent( 

802 self, request: events_pb2.CancelEventReq, context: CouchersContext, session: Session 

803 ) -> empty_pb2.Empty: 

804 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

805 if not res: 805 ↛ 806line 805 didn't jump to line 806 because the condition on line 805 was never true

806 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

807 

808 event, occurrence = res 

809 

810 if not _can_edit_event(session, event, context.user_id): 810 ↛ 811line 810 didn't jump to line 811 because the condition on line 810 was never true

811 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

812 

813 if occurrence.end_time < now() - timedelta(hours=24): 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true

814 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_cancel_old_event") 

815 

816 occurrence.is_cancelled = True 

817 

818 log_event(context, session, "event.cancelled", {"event_id": event.id, "occurrence_id": occurrence.id}) 

819 

820 queue_job( 

821 session, 

822 job=generate_event_cancel_notifications, 

823 payload=jobs_pb2.GenerateEventCancelNotificationsPayload( 

824 cancelling_user_id=context.user_id, 

825 occurrence_id=occurrence.id, 

826 ), 

827 ) 

828 

829 return empty_pb2.Empty() 

830 

831 def RequestCommunityInvite( 

832 self, request: events_pb2.RequestCommunityInviteReq, context: CouchersContext, session: Session 

833 ) -> empty_pb2.Empty: 

834 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

835 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

836 if not res: 836 ↛ 837line 836 didn't jump to line 837 because the condition on line 836 was never true

837 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

838 

839 event, occurrence = res 

840 

841 if not _can_edit_event(session, event, context.user_id): 

842 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

843 

844 if occurrence.is_cancelled: 844 ↛ 845line 844 didn't jump to line 845 because the condition on line 844 was never true

845 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

846 

847 if occurrence.end_time < now() - timedelta(hours=24): 847 ↛ 848line 847 didn't jump to line 848 because the condition on line 847 was never true

848 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

849 

850 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id] 

851 

852 if len(this_user_reqs) > 0: 

853 context.abort_with_error_code( 

854 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_requested" 

855 ) 

856 

857 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved] 

858 

859 if len(approved_reqs) > 0: 

860 context.abort_with_error_code( 

861 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_approved" 

862 ) 

863 

864 req = EventCommunityInviteRequest( 

865 occurrence_id=request.event_id, 

866 user_id=context.user_id, 

867 ) 

868 session.add(req) 

869 session.flush() 

870 

871 send_event_community_invite_request_email(session, req) 

872 

873 return empty_pb2.Empty() 

874 

875 def ListEventOccurrences( 

876 self, request: events_pb2.ListEventOccurrencesReq, context: CouchersContext, session: Session 

877 ) -> events_pb2.ListEventOccurrencesRes: 

878 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

879 # the page token is a unix timestamp of where we left off 

880 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

881 initial_query = ( 

882 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

883 ) 

884 initial_query = where_moderated_content_visible( 

885 initial_query, context, EventOccurrence, is_list_operation=False 

886 ) 

887 occurrence = session.execute(initial_query).scalar_one_or_none() 

888 if not occurrence: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

890 

891 query = ( 

892 select(EventOccurrence) 

893 .where(EventOccurrence.event_id == occurrence.event_id) 

894 .where(~EventOccurrence.is_deleted) 

895 ) 

896 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

897 

898 if not request.include_cancelled: 

899 query = query.where(~EventOccurrence.is_cancelled) 

900 

901 if not request.past: 901 ↛ 905line 901 didn't jump to line 905 because the condition on line 901 was always true

902 cutoff = page_token - timedelta(seconds=1) 

903 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

904 else: 

905 cutoff = page_token + timedelta(seconds=1) 

906 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

907 

908 query = query.limit(page_size + 1) 

909 occurrences = session.execute(query).scalars().all() 

910 

911 return events_pb2.ListEventOccurrencesRes( 

912 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

913 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

914 ) 

915 

916 def ListEventAttendees( 

917 self, request: events_pb2.ListEventAttendeesReq, context: CouchersContext, session: Session 

918 ) -> events_pb2.ListEventAttendeesRes: 

919 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

920 next_user_id = int(request.page_token) if request.page_token else 0 

921 occurrence = session.execute( 

922 where_moderated_content_visible( 

923 select(EventOccurrence) 

924 .where(EventOccurrence.id == request.event_id) 

925 .where(~EventOccurrence.is_deleted), 

926 context, 

927 EventOccurrence, 

928 is_list_operation=False, 

929 ) 

930 ).scalar_one_or_none() 

931 if not occurrence: 931 ↛ 932line 931 didn't jump to line 932 because the condition on line 931 was never true

932 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

933 attendees = ( 

934 session.execute( 

935 where_users_column_visible( 

936 select(EventOccurrenceAttendee) 

937 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

938 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

939 .order_by(EventOccurrenceAttendee.user_id) 

940 .limit(page_size + 1), 

941 context, 

942 EventOccurrenceAttendee.user_id, 

943 ) 

944 ) 

945 .scalars() 

946 .all() 

947 ) 

948 return events_pb2.ListEventAttendeesRes( 

949 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

950 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

951 ) 

952 

953 def ListEventSubscribers( 

954 self, request: events_pb2.ListEventSubscribersReq, context: CouchersContext, session: Session 

955 ) -> events_pb2.ListEventSubscribersRes: 

956 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

957 next_user_id = int(request.page_token) if request.page_token else 0 

958 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

959 if not res: 959 ↛ 960line 959 didn't jump to line 960 because the condition on line 959 was never true

960 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

961 event, occurrence = res 

962 subscribers = ( 

963 session.execute( 

964 where_users_column_visible( 

965 select(EventSubscription) 

966 .where(EventSubscription.event_id == event.id) 

967 .where(EventSubscription.user_id >= next_user_id) 

968 .order_by(EventSubscription.user_id) 

969 .limit(page_size + 1), 

970 context, 

971 EventSubscription.user_id, 

972 ) 

973 ) 

974 .scalars() 

975 .all() 

976 ) 

977 return events_pb2.ListEventSubscribersRes( 

978 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

979 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

980 ) 

981 

982 def ListEventOrganizers( 

983 self, request: events_pb2.ListEventOrganizersReq, context: CouchersContext, session: Session 

984 ) -> events_pb2.ListEventOrganizersRes: 

985 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

986 next_user_id = int(request.page_token) if request.page_token else 0 

987 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

988 if not res: 988 ↛ 989line 988 didn't jump to line 989 because the condition on line 988 was never true

989 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

990 event, occurrence = res 

991 organizers = ( 

992 session.execute( 

993 where_users_column_visible( 

994 select(EventOrganizer) 

995 .where(EventOrganizer.event_id == event.id) 

996 .where(EventOrganizer.user_id >= next_user_id) 

997 .order_by(EventOrganizer.user_id) 

998 .limit(page_size + 1), 

999 context, 

1000 EventOrganizer.user_id, 

1001 ) 

1002 ) 

1003 .scalars() 

1004 .all() 

1005 ) 

1006 return events_pb2.ListEventOrganizersRes( 

1007 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

1008 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

1009 ) 

1010 

1011 def TransferEvent( 

1012 self, request: events_pb2.TransferEventReq, context: CouchersContext, session: Session 

1013 ) -> events_pb2.Event: 

1014 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1015 if not res: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true

1016 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1017 

1018 event, occurrence = res 

1019 

1020 if not _can_edit_event(session, event, context.user_id): 

1021 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_transfer_permission_denied") 

1022 

1023 if occurrence.is_cancelled: 

1024 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1025 

1026 if occurrence.end_time < now() - timedelta(hours=24): 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1028 

1029 if request.WhichOneof("new_owner") == "new_owner_group_id": 

1030 cluster = session.execute( 

1031 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

1032 ).scalar_one_or_none() 

1033 elif request.WhichOneof("new_owner") == "new_owner_community_id": 1033 ↛ 1040line 1033 didn't jump to line 1040 because the condition on line 1033 was always true

1034 cluster = session.execute( 

1035 select(Cluster) 

1036 .where(Cluster.parent_node_id == request.new_owner_community_id) 

1037 .where(Cluster.is_official_cluster) 

1038 ).scalar_one_or_none() 

1039 

1040 if not cluster: 1040 ↛ 1041line 1040 didn't jump to line 1041 because the condition on line 1040 was never true

1041 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_or_community_not_found") 

1042 

1043 event.owner_user = None 

1044 event.owner_cluster = cluster 

1045 

1046 session.commit() 

1047 return event_to_pb(session, occurrence, context) 

1048 

1049 def SetEventSubscription( 

1050 self, request: events_pb2.SetEventSubscriptionReq, context: CouchersContext, session: Session 

1051 ) -> events_pb2.Event: 

1052 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1053 if not res: 1053 ↛ 1054line 1053 didn't jump to line 1054 because the condition on line 1053 was never true

1054 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1055 

1056 event, occurrence = res 

1057 

1058 if occurrence.is_cancelled: 

1059 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1060 

1061 if occurrence.end_time < now() - timedelta(hours=24): 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true

1062 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1063 

1064 current_subscription = session.execute( 

1065 select(EventSubscription) 

1066 .where(EventSubscription.user_id == context.user_id) 

1067 .where(EventSubscription.event_id == event.id) 

1068 ).scalar_one_or_none() 

1069 

1070 # if not subscribed, subscribe 

1071 if request.subscribe and not current_subscription: 

1072 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

1073 

1074 # if subscribed but unsubbing, remove subscription 

1075 if not request.subscribe and current_subscription: 

1076 session.delete(current_subscription) 

1077 

1078 session.flush() 

1079 

1080 log_event( 

1081 context, 

1082 session, 

1083 "event.subscription_set", 

1084 {"event_id": event.id, "occurrence_id": occurrence.id, "subscribed": request.subscribe}, 

1085 ) 

1086 

1087 return event_to_pb(session, occurrence, context) 

1088 

1089 def SetEventAttendance( 

1090 self, request: events_pb2.SetEventAttendanceReq, context: CouchersContext, session: Session 

1091 ) -> events_pb2.Event: 

1092 occurrence = session.execute( 

1093 where_moderated_content_visible( 

1094 select(EventOccurrence) 

1095 .where(EventOccurrence.id == request.event_id) 

1096 .where(~EventOccurrence.is_deleted), 

1097 context, 

1098 EventOccurrence, 

1099 is_list_operation=False, 

1100 ) 

1101 ).scalar_one_or_none() 

1102 

1103 if not occurrence: 1103 ↛ 1104line 1103 didn't jump to line 1104 because the condition on line 1103 was never true

1104 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1105 

1106 if occurrence.is_cancelled: 

1107 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1108 

1109 if occurrence.end_time < now() - timedelta(hours=24): 1109 ↛ 1110line 1109 didn't jump to line 1110 because the condition on line 1109 was never true

1110 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1111 

1112 current_attendance = session.execute( 

1113 select(EventOccurrenceAttendee) 

1114 .where(EventOccurrenceAttendee.user_id == context.user_id) 

1115 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1116 ).scalar_one_or_none() 

1117 

1118 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

1119 if current_attendance: 1119 ↛ 1134line 1119 didn't jump to line 1134 because the condition on line 1119 was always true

1120 session.delete(current_attendance) 

1121 # if unset/not going, nothing to do! 

1122 else: 

1123 if current_attendance: 1123 ↛ 1124line 1123 didn't jump to line 1124 because the condition on line 1123 was never true

1124 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] # type: ignore[assignment] 

1125 else: 

1126 # create new 

1127 attendance = EventOccurrenceAttendee( 

1128 user_id=context.user_id, 

1129 occurrence_id=occurrence.id, 

1130 attendee_status=not_none(attendancestate2sql[request.attendance_state]), 

1131 ) 

1132 session.add(attendance) 

1133 

1134 session.flush() 

1135 

1136 log_event( 

1137 context, 

1138 session, 

1139 "event.attendance_set", 

1140 {"occurrence_id": occurrence.id, "attendance_state": request.attendance_state}, 

1141 ) 

1142 

1143 return event_to_pb(session, occurrence, context) 

1144 

1145 def ListMyEvents( 

1146 self, request: events_pb2.ListMyEventsReq, context: CouchersContext, session: Session 

1147 ) -> events_pb2.ListMyEventsRes: 

1148 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1149 # the page token is a unix timestamp of where we left off 

1150 page_token = ( 

1151 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

1152 ) 

1153 # the page number is the page number we are on 

1154 page_number = request.page_number or 1 

1155 # Calculate the offset for pagination 

1156 offset = (page_number - 1) * page_size 

1157 query = ( 

1158 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1159 ) 

1160 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

1161 

1162 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

1163 include_subscribed = request.subscribed or include_all 

1164 include_organizing = request.organizing or include_all 

1165 include_attending = request.attending or include_all 

1166 include_my_communities = request.my_communities or include_all 

1167 

1168 if include_attending and request.exclude_attending: 

1169 context.abort_with_error_code( 

1170 grpc.StatusCode.INVALID_ARGUMENT, "cannot_combine_attending_and_exclude_attending" 

1171 ) 

1172 

1173 where_ = [] 

1174 

1175 if include_subscribed: 

1176 query = query.outerjoin( 

1177 EventSubscription, 

1178 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

1179 ) 

1180 where_.append(EventSubscription.user_id != None) 

1181 if include_organizing: 

1182 query = query.outerjoin( 

1183 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

1184 ) 

1185 where_.append(EventOrganizer.user_id != None) 

1186 if include_attending or request.exclude_attending: 

1187 query = query.outerjoin( 

1188 EventOccurrenceAttendee, 

1189 and_( 

1190 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

1191 EventOccurrenceAttendee.user_id == context.user_id, 

1192 ), 

1193 ) 

1194 if include_attending: 

1195 where_.append(EventOccurrenceAttendee.user_id != None) 

1196 elif request.exclude_attending: 1196 ↛ 1203line 1196 didn't jump to line 1203 because the condition on line 1196 was always true

1197 if not include_organizing: 1197 ↛ 1202line 1197 didn't jump to line 1202 because the condition on line 1197 was always true

1198 query = query.outerjoin( 

1199 EventOrganizer, 

1200 and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id), 

1201 ) 

1202 query = query.where(EventOccurrenceAttendee.user_id == None, EventOrganizer.user_id == None) 

1203 if include_my_communities: 

1204 my_communities = ( 

1205 session.execute( 

1206 select(Node.id) 

1207 .join(Cluster, Cluster.parent_node_id == Node.id) 

1208 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

1209 .where(ClusterSubscription.user_id == context.user_id) 

1210 .where(Cluster.is_official_cluster) 

1211 .order_by(Node.id) 

1212 .limit(100000) 

1213 ) 

1214 .scalars() 

1215 .all() 

1216 ) 

1217 where_.append(Event.parent_node_id.in_(my_communities)) 

1218 

1219 query = query.where(or_(*where_)) 

1220 

1221 if request.my_communities_exclude_global: 

1222 query = query.join(Node, Node.id == Event.parent_node_id).where(Node.node_type > NodeType.region) 

1223 

1224 if not request.include_cancelled: 

1225 query = query.where(~EventOccurrence.is_cancelled) 

1226 

1227 if not request.past: 1227 ↛ 1231line 1227 didn't jump to line 1231 because the condition on line 1227 was always true

1228 cutoff = page_token - timedelta(seconds=1) 

1229 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1230 else: 

1231 cutoff = page_token + timedelta(seconds=1) 

1232 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1233 # Count the total number of items for pagination 

1234 total_items = session.execute(select(func.count()).select_from(query.subquery())).scalar() 

1235 # Apply pagination by page number 

1236 query = query.offset(offset).limit(page_size) if request.page_number else query.limit(page_size + 1) 

1237 occurrences = session.execute(query).scalars().all() 

1238 

1239 return events_pb2.ListMyEventsRes( 

1240 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1241 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1242 total_items=total_items, 

1243 ) 

1244 

1245 def ListAllEvents( 

1246 self, request: events_pb2.ListAllEventsReq, context: CouchersContext, session: Session 

1247 ) -> events_pb2.ListAllEventsRes: 

1248 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1249 # the page token is a unix timestamp of where we left off 

1250 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

1251 

1252 query = select(EventOccurrence).where(~EventOccurrence.is_deleted) 

1253 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

1254 

1255 if not request.include_cancelled: 1255 ↛ 1258line 1255 didn't jump to line 1258 because the condition on line 1255 was always true

1256 query = query.where(~EventOccurrence.is_cancelled) 

1257 

1258 if not request.past: 

1259 cutoff = page_token - timedelta(seconds=1) 

1260 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1261 else: 

1262 cutoff = page_token + timedelta(seconds=1) 

1263 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1264 

1265 query = query.limit(page_size + 1) 

1266 occurrences = session.execute(query).scalars().all() 

1267 

1268 return events_pb2.ListAllEventsRes( 

1269 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1270 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1271 ) 

1272 

1273 def InviteEventOrganizer( 

1274 self, request: events_pb2.InviteEventOrganizerReq, context: CouchersContext, session: Session 

1275 ) -> empty_pb2.Empty: 

1276 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

1277 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1278 if not res: 1278 ↛ 1279line 1278 didn't jump to line 1279 because the condition on line 1278 was never true

1279 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1280 

1281 event, occurrence = res 

1282 

1283 if not _can_edit_event(session, event, context.user_id): 

1284 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

1285 

1286 if occurrence.is_cancelled: 

1287 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1288 

1289 if occurrence.end_time < now() - timedelta(hours=24): 1289 ↛ 1290line 1289 didn't jump to line 1290 because the condition on line 1289 was never true

1290 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1291 

1292 if not session.execute( 1292 ↛ 1295line 1292 didn't jump to line 1295 because the condition on line 1292 was never true

1293 select(User).where(users_visible(context)).where(User.id == request.user_id) 

1294 ).scalar_one_or_none(): 

1295 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1296 

1297 session.add( 

1298 EventOrganizer( 

1299 user_id=request.user_id, 

1300 event_id=event.id, 

1301 ) 

1302 ) 

1303 session.flush() 

1304 

1305 other_user_context = make_notification_user_context(user_id=request.user_id) 

1306 

1307 notify( 

1308 session, 

1309 user_id=request.user_id, 

1310 topic_action=NotificationTopicAction.event__invite_organizer, 

1311 key=str(event.id), 

1312 data=notification_data_pb2.EventInviteOrganizer( 

1313 event=event_to_pb(session, occurrence, other_user_context), 

1314 inviting_user=user_model_to_pb(user, session, other_user_context), 

1315 ), 

1316 ) 

1317 

1318 return empty_pb2.Empty() 

1319 

1320 def RemoveEventOrganizer( 

1321 self, request: events_pb2.RemoveEventOrganizerReq, context: CouchersContext, session: Session 

1322 ) -> empty_pb2.Empty: 

1323 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1324 if not res: 1324 ↛ 1325line 1324 didn't jump to line 1325 because the condition on line 1324 was never true

1325 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1326 

1327 event, occurrence = res 

1328 

1329 if occurrence.is_cancelled: 1329 ↛ 1330line 1329 didn't jump to line 1330 because the condition on line 1329 was never true

1330 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1331 

1332 if occurrence.end_time < now() - timedelta(hours=24): 1332 ↛ 1333line 1332 didn't jump to line 1333 because the condition on line 1332 was never true

1333 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1334 

1335 # Determine which user to remove 

1336 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id 

1337 

1338 # Check if the target user is the event owner (only after permission check) 

1339 if event.owner_user_id == user_id_to_remove: 

1340 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_remove_owner_as_organizer") 

1341 

1342 # Check permissions: either an organizer removing an organizer OR you're the event owner 

1343 if not _can_edit_event(session, event, context.user_id): 

1344 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_edit_permission_denied") 

1345 

1346 # Find the organizer to remove 

1347 organizer_to_remove = session.execute( 

1348 select(EventOrganizer) 

1349 .where(EventOrganizer.user_id == user_id_to_remove) 

1350 .where(EventOrganizer.event_id == event.id) 

1351 ).scalar_one_or_none() 

1352 

1353 if not organizer_to_remove: 1353 ↛ 1354line 1353 didn't jump to line 1354 because the condition on line 1353 was never true

1354 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_not_an_organizer") 

1355 

1356 session.delete(organizer_to_remove) 

1357 

1358 return empty_pb2.Empty()