Coverage for app / backend / src / couchers / servicers / events.py: 84%

546 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import logging 

2from datetime import datetime, timedelta 

3from typing import Any, cast 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from psycopg2.extras import DateTimeTZRange 

8from sqlalchemy import Select, select 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import and_, func, or_, update 

11 

12from couchers.context import CouchersContext, make_background_user_context 

13from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

14from couchers.event_log import log_event 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.jobs.enqueue import queue_job 

17from couchers.models import ( 

18 AttendeeStatus, 

19 Cluster, 

20 ClusterSubscription, 

21 Event, 

22 EventCommunityInviteRequest, 

23 EventOccurrence, 

24 EventOccurrenceAttendee, 

25 EventOrganizer, 

26 EventSubscription, 

27 ModerationObjectType, 

28 Node, 

29 NodeType, 

30 Thread, 

31 Upload, 

32 User, 

33) 

34from couchers.models.notifications import NotificationTopicAction 

35from couchers.moderation.utils import create_moderation 

36from couchers.notifications.notify import notify 

37from couchers.proto import events_pb2, events_pb2_grpc, notification_data_pb2 

38from couchers.proto.internal import jobs_pb2 

39from couchers.servicers.api import user_model_to_pb 

40from couchers.servicers.blocking import is_not_visible 

41from couchers.servicers.threads import thread_to_pb 

42from couchers.sql import users_visible, where_moderated_content_visible, where_users_column_visible 

43from couchers.tasks import send_event_community_invite_request_email 

44from couchers.utils import ( 

45 Timestamp_from_datetime, 

46 create_coordinate, 

47 dt_from_millis, 

48 millis_from_dt, 

49 not_none, 

50 now, 

51 to_aware_datetime, 

52) 

53 

54logger = logging.getLogger(__name__) 

55 

56attendancestate2sql = { 

57 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

58 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

59 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe, 

60} 

61 

62attendancestate2api = { 

63 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

64 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

65 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE, 

66} 

67 

68MAX_PAGINATION_LENGTH = 25 

69 

70 

71def _is_event_owner(event: Event, user_id: int) -> bool: 

72 """ 

73 Checks whether the user can act as an owner of the event 

74 """ 

75 if event.owner_user: 

76 return event.owner_user_id == user_id 

77 # otherwise owned by a cluster 

78 return not_none(event.owner_cluster).admins.where(User.id == user_id).one_or_none() is not None 

79 

80 

81def _is_event_organizer(event: Event, user_id: int) -> bool: 

82 """ 

83 Checks whether the user is as an organizer of the event 

84 """ 

85 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None 

86 

87 

88def _can_moderate_event(session: Session, event: Event, user_id: int) -> bool: 

89 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

90 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

91 return True 

92 

93 # finally check if the user can moderate the parent node of the cluster 

94 return can_moderate_node(session, user_id, event.parent_node_id) 

95 

96 

97def _can_edit_event(session: Session, event: Event, user_id: int) -> bool: 

98 return ( 

99 _is_event_owner(event, user_id) 

100 or _is_event_organizer(event, user_id) 

101 or _can_moderate_event(session, event, user_id) 

102 ) 

103 

104 

105def event_to_pb(session: Session, occurrence: EventOccurrence, context: CouchersContext) -> events_pb2.Event: 

106 event = occurrence.event 

107 

108 next_occurrence = ( 

109 event.occurrences.where(EventOccurrence.end_time >= now()) 

110 .order_by(EventOccurrence.end_time.asc()) 

111 .limit(1) 

112 .one_or_none() 

113 ) 

114 

115 owner_community_id = None 

116 owner_group_id = None 

117 if event.owner_cluster: 

118 if event.owner_cluster.is_official_cluster: 

119 owner_community_id = event.owner_cluster.parent_node_id 

120 else: 

121 owner_group_id = event.owner_cluster.id 

122 

123 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

124 attendance_state = attendance.attendee_status if attendance else None 

125 

126 can_moderate = _can_moderate_event(session, event, context.user_id) 

127 can_edit = _can_edit_event(session, event, context.user_id) 

128 

129 going_count = session.execute( 

130 where_users_column_visible( 

131 select(func.count()) 

132 .select_from(EventOccurrenceAttendee) 

133 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

134 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going), 

135 context, 

136 EventOccurrenceAttendee.user_id, 

137 ) 

138 ).scalar_one() 

139 maybe_count = session.execute( 

140 where_users_column_visible( 

141 select(func.count()) 

142 .select_from(EventOccurrenceAttendee) 

143 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

144 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe), 

145 context, 

146 EventOccurrenceAttendee.user_id, 

147 ) 

148 ).scalar_one() 

149 

150 organizer_count = session.execute( 

151 where_users_column_visible( 

152 select(func.count()).select_from(EventOrganizer).where(EventOrganizer.event_id == event.id), 

153 context, 

154 EventOrganizer.user_id, 

155 ) 

156 ).scalar_one() 

157 subscriber_count = session.execute( 

158 where_users_column_visible( 

159 select(func.count()).select_from(EventSubscription).where(EventSubscription.event_id == event.id), 

160 context, 

161 EventSubscription.user_id, 

162 ) 

163 ).scalar_one() 

164 

165 return events_pb2.Event( 

166 event_id=occurrence.id, 

167 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

168 is_cancelled=occurrence.is_cancelled, 

169 is_deleted=occurrence.is_deleted, 

170 title=event.title, 

171 slug=event.slug, 

172 content=occurrence.content, 

173 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

174 photo_key=occurrence.photo_key or "", 

175 online_information=( 

176 events_pb2.OnlineEventInformation( 

177 link=occurrence.link, 

178 ) 

179 if occurrence.link 

180 else None 

181 ), 

182 offline_information=( 

183 events_pb2.OfflineEventInformation( 

184 lat=not_none(occurrence.coordinates)[0], 

185 lng=not_none(occurrence.coordinates)[1], 

186 address=occurrence.address, 

187 ) 

188 if occurrence.geom 

189 else None 

190 ), 

191 created=Timestamp_from_datetime(occurrence.created), 

192 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

193 creator_user_id=occurrence.creator_user_id, 

194 start_time=Timestamp_from_datetime(occurrence.start_time), 

195 end_time=Timestamp_from_datetime(occurrence.end_time), 

196 timezone=occurrence.timezone, 

197 start_time_display=str(occurrence.start_time), 

198 end_time_display=str(occurrence.end_time), 

199 attendance_state=attendancestate2api[attendance_state], 

200 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

201 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

202 going_count=going_count, 

203 maybe_count=maybe_count, 

204 organizer_count=organizer_count, 

205 subscriber_count=subscriber_count, 

206 owner_user_id=event.owner_user_id, 

207 owner_community_id=owner_community_id, 

208 owner_group_id=owner_group_id, 

209 thread=thread_to_pb(session, event.thread_id), 

210 can_edit=can_edit, 

211 can_moderate=can_moderate, 

212 ) 

213 

214 

215def _get_event_and_occurrence_query( 

216 occurrence_id: int, 

217 include_deleted: bool, 

218 context: CouchersContext | None = None, 

219) -> Select[tuple[Event, EventOccurrence]]: 

220 query = ( 

221 select(Event, EventOccurrence) 

222 .where(EventOccurrence.id == occurrence_id) 

223 .where(EventOccurrence.event_id == Event.id) 

224 ) 

225 

226 if not include_deleted: 226 ↛ 229line 226 didn't jump to line 229 because the condition on line 226 was always true

227 query = query.where(~EventOccurrence.is_deleted) 

228 

229 if context is not None: 

230 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=False) 

231 

232 return query 

233 

234 

235def _get_event_and_occurrence_one( 

236 session: Session, occurrence_id: int, include_deleted: bool = False 

237) -> tuple[Event, EventOccurrence]: 

238 """For background jobs only - no visibility filtering.""" 

239 result = session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one() 

240 return result._tuple() 

241 

242 

243def _get_event_and_occurrence_one_or_none( 

244 session: Session, occurrence_id: int, context: CouchersContext, include_deleted: bool = False 

245) -> tuple[Event, EventOccurrence] | None: 

246 result = session.execute( 

247 _get_event_and_occurrence_query(occurrence_id, include_deleted, context=context) 

248 ).one_or_none() 

249 return result._tuple() if result else None 

250 

251 

252def _check_occurrence_time_validity(start_time: datetime, end_time: datetime, context: CouchersContext) -> None: 

253 if start_time < now(): 

254 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_in_past") 

255 if end_time < start_time: 

256 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_ends_before_starts") 

257 if end_time - start_time > timedelta(days=7): 

258 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_long") 

259 if start_time - now() > timedelta(days=365): 

260 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_far_in_future") 

261 

262 

263def get_users_to_notify_for_new_event(session: Session, occurrence: EventOccurrence) -> tuple[list[User], int | None]: 

264 """ 

265 Returns the users to notify, as well as the community id that is being notified (None if based on geo search) 

266 """ 

267 cluster = occurrence.event.parent_node.official_cluster 

268 if occurrence.event.parent_node.node_type.value <= NodeType.region.value: 

269 logger.info("Global, macroregion, and region communities are too big for email notifications.") 

270 return [], occurrence.event.parent_node_id 

271 elif occurrence.creator_user in cluster.admins or cluster.is_leaf: 271 ↛ 274line 271 didn't jump to line 274 because the condition on line 271 was always true

272 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id 

273 else: 

274 max_radius = 20000 # m 

275 users = ( 

276 session.execute( 

277 select(User) 

278 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

279 .where(User.is_visible) 

280 .where(ClusterSubscription.cluster_id == cluster.id) 

281 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111)) 

282 ) 

283 .scalars() 

284 .all() 

285 ) 

286 return cast(tuple[list[User], int | None], (users, None)) 

287 

288 

289def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload) -> None: 

290 """ 

291 Background job to generated/fan out event notifications 

292 """ 

293 from couchers.servicers.communities import community_to_pb 

294 

295 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}") 

296 

297 with session_scope() as session: 

298 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

299 creator = occurrence.creator_user 

300 

301 users, node_id = get_users_to_notify_for_new_event(session, occurrence) 

302 

303 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none() 

304 

305 if not inviting_user: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?") 

307 return 

308 

309 for user in users: 

310 if is_not_visible(session, user.id, creator.id): 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 continue 

312 context = make_background_user_context(user_id=user.id) 

313 topic_action = ( 

314 NotificationTopicAction.event__create_approved 

315 if payload.approved 

316 else NotificationTopicAction.event__create_any 

317 ) 

318 notify( 

319 session, 

320 user_id=user.id, 

321 topic_action=topic_action, 

322 key=str(payload.occurrence_id), 

323 data=notification_data_pb2.EventCreate( 

324 event=event_to_pb(session, occurrence, context), 

325 inviting_user=user_model_to_pb(inviting_user, session, context), 

326 nearby=True if node_id is None else None, 

327 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None, 

328 ), 

329 moderation_state_id=occurrence.moderation_state_id, 

330 ) 

331 

332 

333def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload) -> None: 

334 with session_scope() as session: 

335 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

336 

337 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one() 

338 

339 subscribed_user_ids = [user.id for user in event.subscribers] 

340 attending_user_ids = [user.user_id for user in occurrence.attendances] 

341 

342 for user_id in set(subscribed_user_ids + attending_user_ids): 

343 if is_not_visible(session, user_id, updating_user.id): 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true

344 continue 

345 context = make_background_user_context(user_id=user_id) 

346 notify( 

347 session, 

348 user_id=user_id, 

349 topic_action=NotificationTopicAction.event__update, 

350 key=str(payload.occurrence_id), 

351 data=notification_data_pb2.EventUpdate( 

352 event=event_to_pb(session, occurrence, context), 

353 updating_user=user_model_to_pb(updating_user, session, context), 

354 updated_items=payload.updated_items, 

355 ), 

356 moderation_state_id=occurrence.moderation_state_id, 

357 ) 

358 

359 

360def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload) -> None: 

361 with session_scope() as session: 

362 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

363 

364 cancelling_user = session.execute(select(User).where(User.id == payload.cancelling_user_id)).scalar_one() 

365 

366 subscribed_user_ids = [user.id for user in event.subscribers] 

367 attending_user_ids = [user.user_id for user in occurrence.attendances] 

368 

369 for user_id in set(subscribed_user_ids + attending_user_ids): 

370 if is_not_visible(session, user_id, cancelling_user.id): 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true

371 continue 

372 context = make_background_user_context(user_id=user_id) 

373 notify( 

374 session, 

375 user_id=user_id, 

376 topic_action=NotificationTopicAction.event__cancel, 

377 key=str(payload.occurrence_id), 

378 data=notification_data_pb2.EventCancel( 

379 event=event_to_pb(session, occurrence, context), 

380 cancelling_user=user_model_to_pb(cancelling_user, session, context), 

381 ), 

382 moderation_state_id=occurrence.moderation_state_id, 

383 ) 

384 

385 

386def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload) -> None: 

387 with session_scope() as session: 

388 event, occurrence = _get_event_and_occurrence_one( 

389 session, occurrence_id=payload.occurrence_id, include_deleted=True 

390 ) 

391 

392 subscribed_user_ids = [user.id for user in event.subscribers] 

393 attending_user_ids = [user.user_id for user in occurrence.attendances] 

394 

395 for user_id in set(subscribed_user_ids + attending_user_ids): 

396 context = make_background_user_context(user_id=user_id) 

397 notify( 

398 session, 

399 user_id=user_id, 

400 topic_action=NotificationTopicAction.event__delete, 

401 key=str(payload.occurrence_id), 

402 data=notification_data_pb2.EventDelete( 

403 event=event_to_pb(session, occurrence, context), 

404 ), 

405 moderation_state_id=occurrence.moderation_state_id, 

406 ) 

407 

408 

409class Events(events_pb2_grpc.EventsServicer): 

410 def CreateEvent( 

411 self, request: events_pb2.CreateEventReq, context: CouchersContext, session: Session 

412 ) -> events_pb2.Event: 

413 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

414 if not has_completed_profile(session, user): 

415 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_create_event") 

416 if not request.title: 

417 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_title") 

418 if not request.content: 

419 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

420 if request.HasField("online_information"): 

421 online = True 

422 geom = None 

423 address = None 

424 if not request.online_information.link: 

425 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

426 link = request.online_information.link 

427 elif request.HasField("offline_information"): 427 ↛ 442line 427 didn't jump to line 442 because the condition on line 427 was always true

428 online = False 

429 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value 

430 if not ( 

431 request.offline_information.address 

432 and request.offline_information.lat 

433 and request.offline_information.lng 

434 ): 

435 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

436 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true

437 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

438 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

439 address = request.offline_information.address 

440 link = None 

441 else: 

442 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

443 

444 start_time = to_aware_datetime(request.start_time) 

445 end_time = to_aware_datetime(request.end_time) 

446 

447 _check_occurrence_time_validity(start_time, end_time, context) 

448 

449 if request.parent_community_id: 

450 parent_node = session.execute( 

451 select(Node).where(Node.id == request.parent_community_id) 

452 ).scalar_one_or_none() 

453 

454 if not parent_node or not parent_node.official_cluster.events_enabled: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

456 else: 

457 if online: 

458 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_missing_parent_community") 

459 # parent community computed from geom 

460 parent_node = get_parent_node_at_location(session, not_none(geom)) 

461 

462 if not parent_node: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "community_not_found") 

464 

465 if ( 

466 request.photo_key 

467 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

468 ): 

469 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

470 

471 thread = Thread() 

472 session.add(thread) 

473 session.flush() 

474 

475 event = Event( 

476 title=request.title, 

477 parent_node_id=parent_node.id, 

478 owner_user_id=context.user_id, 

479 thread_id=thread.id, 

480 creator_user_id=context.user_id, 

481 ) 

482 session.add(event) 

483 session.flush() 

484 

485 occurrence: EventOccurrence | None = None 

486 

487 def create_occurrence(moderation_state_id: int) -> int: 

488 nonlocal occurrence 

489 occurrence = EventOccurrence( 

490 event_id=event.id, 

491 content=request.content, 

492 geom=geom, 

493 address=address, 

494 link=link, 

495 photo_key=request.photo_key if request.photo_key != "" else None, 

496 # timezone=timezone, 

497 during=DateTimeTZRange(start_time, end_time), 

498 creator_user_id=context.user_id, 

499 moderation_state_id=moderation_state_id, 

500 ) 

501 session.add(occurrence) 

502 session.flush() 

503 return occurrence.id 

504 

505 create_moderation( 

506 session=session, 

507 object_type=ModerationObjectType.event_occurrence, 

508 object_id=create_occurrence, 

509 creator_user_id=context.user_id, 

510 ) 

511 

512 assert occurrence is not None 

513 

514 session.add( 

515 EventOrganizer( 

516 user_id=context.user_id, 

517 event_id=event.id, 

518 ) 

519 ) 

520 

521 session.add( 

522 EventSubscription( 

523 user_id=context.user_id, 

524 event_id=event.id, 

525 ) 

526 ) 

527 

528 session.add( 

529 EventOccurrenceAttendee( 

530 user_id=context.user_id, 

531 occurrence_id=occurrence.id, 

532 attendee_status=AttendeeStatus.going, 

533 ) 

534 ) 

535 

536 session.commit() 

537 

538 log_event( 

539 context, 

540 session, 

541 "event.created", 

542 { 

543 "event_id": event.id, 

544 "occurrence_id": occurrence.id, 

545 "parent_community_id": parent_node.id, 

546 "parent_community_name": parent_node.official_cluster.name, 

547 "online": online, 

548 }, 

549 ) 

550 

551 if has_completed_profile(session, user): 551 ↛ 562line 551 didn't jump to line 562 because the condition on line 551 was always true

552 queue_job( 

553 session, 

554 job=generate_event_create_notifications, 

555 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

556 inviting_user_id=user.id, 

557 occurrence_id=occurrence.id, 

558 approved=False, 

559 ), 

560 ) 

561 

562 return event_to_pb(session, occurrence, context) 

563 

564 def ScheduleEvent( 

565 self, request: events_pb2.ScheduleEventReq, context: CouchersContext, session: Session 

566 ) -> events_pb2.Event: 

567 if not request.content: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

569 if request.HasField("online_information"): 

570 geom = None 

571 address = None 

572 link = request.online_information.link 

573 elif request.HasField("offline_information"): 573 ↛ 586line 573 didn't jump to line 586 because the condition on line 573 was always true

574 if not ( 574 ↛ 579line 574 didn't jump to line 579 because the condition on line 574 was never true

575 request.offline_information.address 

576 and request.offline_information.lat 

577 and request.offline_information.lng 

578 ): 

579 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

580 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 580 ↛ 581line 580 didn't jump to line 581 because the condition on line 580 was never true

581 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

582 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

583 address = request.offline_information.address 

584 link = None 

585 else: 

586 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

587 

588 start_time = to_aware_datetime(request.start_time) 

589 end_time = to_aware_datetime(request.end_time) 

590 

591 _check_occurrence_time_validity(start_time, end_time, context) 

592 

593 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

594 if not res: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

596 

597 event, occurrence = res 

598 

599 if not _can_edit_event(session, event, context.user_id): 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true

600 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

601 

602 if occurrence.is_cancelled: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true

603 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

604 

605 if ( 605 ↛ 609line 605 didn't jump to line 609 because the condition on line 605 was never true

606 request.photo_key 

607 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

608 ): 

609 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

610 

611 during = DateTimeTZRange(start_time, end_time) 

612 

613 # && is the overlap operator for ranges 

614 if ( 

615 session.execute( 

616 select(EventOccurrence.id) 

617 .where(EventOccurrence.event_id == event.id) 

618 .where(EventOccurrence.during.op("&&")(during)) 

619 .limit(1) 

620 ) 

621 .scalars() 

622 .one_or_none() 

623 is not None 

624 ): 

625 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

626 

627 new_occurrence: EventOccurrence | None = None 

628 

629 def create_occurrence(moderation_state_id: int) -> int: 

630 nonlocal new_occurrence 

631 new_occurrence = EventOccurrence( 

632 event_id=event.id, 

633 content=request.content, 

634 geom=geom, 

635 address=address, 

636 link=link, 

637 photo_key=request.photo_key if request.photo_key != "" else None, 

638 # timezone=timezone, 

639 during=during, 

640 creator_user_id=context.user_id, 

641 moderation_state_id=moderation_state_id, 

642 ) 

643 session.add(new_occurrence) 

644 session.flush() 

645 return new_occurrence.id 

646 

647 create_moderation( 

648 session=session, 

649 object_type=ModerationObjectType.event_occurrence, 

650 object_id=create_occurrence, 

651 creator_user_id=context.user_id, 

652 ) 

653 

654 assert new_occurrence is not None 

655 

656 session.add( 

657 EventOccurrenceAttendee( 

658 user_id=context.user_id, 

659 occurrence_id=new_occurrence.id, 

660 attendee_status=AttendeeStatus.going, 

661 ) 

662 ) 

663 

664 session.flush() 

665 

666 # TODO: notify 

667 

668 return event_to_pb(session, new_occurrence, context) 

669 

670 def UpdateEvent( 

671 self, request: events_pb2.UpdateEventReq, context: CouchersContext, session: Session 

672 ) -> events_pb2.Event: 

673 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

674 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

675 if not res: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

677 

678 event, occurrence = res 

679 

680 if not _can_edit_event(session, event, context.user_id): 680 ↛ 681line 680 didn't jump to line 681 because the condition on line 680 was never true

681 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

682 

683 # the things that were updated and need to be notified about 

684 notify_updated = [] 

685 

686 if occurrence.is_cancelled: 

687 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

688 

689 occurrence_update: dict[str, Any] = {"last_edited": now()} 

690 

691 if request.HasField("title"): 

692 notify_updated.append("title") 

693 event.title = request.title.value 

694 

695 if request.HasField("content"): 

696 notify_updated.append("content") 

697 occurrence_update["content"] = request.content.value 

698 

699 if request.HasField("photo_key"): 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true

700 occurrence_update["photo_key"] = request.photo_key.value 

701 

702 if request.HasField("online_information"): 

703 notify_updated.append("location") 

704 if not request.online_information.link: 704 ↛ 705line 704 didn't jump to line 705 because the condition on line 704 was never true

705 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

706 occurrence_update["link"] = request.online_information.link 

707 occurrence_update["geom"] = None 

708 occurrence_update["address"] = None 

709 elif request.HasField("offline_information"): 

710 notify_updated.append("location") 

711 occurrence_update["link"] = None 

712 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 712 ↛ 713line 712 didn't jump to line 713 because the condition on line 712 was never true

713 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

714 occurrence_update["geom"] = create_coordinate( 

715 request.offline_information.lat, request.offline_information.lng 

716 ) 

717 occurrence_update["address"] = request.offline_information.address 

718 

719 if request.HasField("start_time") or request.HasField("end_time"): 

720 if request.update_all_future: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_cant_update_all_times") 

722 if request.HasField("start_time"): 722 ↛ 726line 722 didn't jump to line 726 because the condition on line 722 was always true

723 notify_updated.append("start time") 

724 start_time = to_aware_datetime(request.start_time) 

725 else: 

726 start_time = occurrence.start_time 

727 if request.HasField("end_time"): 

728 notify_updated.append("end time") 

729 end_time = to_aware_datetime(request.end_time) 

730 else: 

731 end_time = occurrence.end_time 

732 

733 _check_occurrence_time_validity(start_time, end_time, context) 

734 

735 during = DateTimeTZRange(start_time, end_time) 

736 

737 # && is the overlap operator for ranges 

738 if ( 

739 session.execute( 

740 select(EventOccurrence.id) 

741 .where(EventOccurrence.event_id == event.id) 

742 .where(EventOccurrence.id != occurrence.id) 

743 .where(EventOccurrence.during.op("&&")(during)) 

744 .limit(1) 

745 ) 

746 .scalars() 

747 .one_or_none() 

748 is not None 

749 ): 

750 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

751 

752 occurrence_update["during"] = during 

753 

754 # TODO 

755 # if request.HasField("timezone"): 

756 # occurrence_update["timezone"] = request.timezone 

757 

758 # allow editing any event which hasn't ended more than 24 hours before now 

759 # when editing all future events, we edit all which have not yet ended 

760 

761 cutoff_time = now() - timedelta(hours=24) 

762 if request.update_all_future: 

763 session.execute( 

764 update(EventOccurrence) 

765 .where(EventOccurrence.end_time >= cutoff_time) 

766 .where(EventOccurrence.start_time >= occurrence.start_time) 

767 .values(occurrence_update) 

768 .execution_options(synchronize_session=False) 

769 ) 

770 else: 

771 if occurrence.end_time < cutoff_time: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true

772 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

773 session.execute( 

774 update(EventOccurrence) 

775 .where(EventOccurrence.end_time >= cutoff_time) 

776 .where(EventOccurrence.id == occurrence.id) 

777 .values(occurrence_update) 

778 .execution_options(synchronize_session=False) 

779 ) 

780 

781 session.flush() 

782 

783 if notify_updated: 

784 if request.should_notify: 

785 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying") 

786 

787 queue_job( 

788 session, 

789 job=generate_event_update_notifications, 

790 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload( 

791 updating_user_id=user.id, 

792 occurrence_id=occurrence.id, 

793 updated_items=notify_updated, 

794 ), 

795 ) 

796 else: 

797 logger.info( 

798 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications" 

799 ) 

800 

801 # since we have synchronize_session=False, we have to refresh the object 

802 session.refresh(occurrence) 

803 

804 return event_to_pb(session, occurrence, context) 

805 

806 def GetEvent(self, request: events_pb2.GetEventReq, context: CouchersContext, session: Session) -> events_pb2.Event: 

807 query = select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

808 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=False) 

809 occurrence = session.execute(query).scalar_one_or_none() 

810 

811 if not occurrence: 

812 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

813 

814 return event_to_pb(session, occurrence, context) 

815 

816 def CancelEvent( 

817 self, request: events_pb2.CancelEventReq, context: CouchersContext, session: Session 

818 ) -> empty_pb2.Empty: 

819 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

820 if not res: 820 ↛ 821line 820 didn't jump to line 821 because the condition on line 820 was never true

821 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

822 

823 event, occurrence = res 

824 

825 if not _can_edit_event(session, event, context.user_id): 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true

826 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

827 

828 if occurrence.end_time < now() - timedelta(hours=24): 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_cancel_old_event") 

830 

831 occurrence.is_cancelled = True 

832 

833 log_event(context, session, "event.cancelled", {"event_id": event.id, "occurrence_id": occurrence.id}) 

834 

835 queue_job( 

836 session, 

837 job=generate_event_cancel_notifications, 

838 payload=jobs_pb2.GenerateEventCancelNotificationsPayload( 

839 cancelling_user_id=context.user_id, 

840 occurrence_id=occurrence.id, 

841 ), 

842 ) 

843 

844 return empty_pb2.Empty() 

845 

846 def RequestCommunityInvite( 

847 self, request: events_pb2.RequestCommunityInviteReq, context: CouchersContext, session: Session 

848 ) -> empty_pb2.Empty: 

849 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

850 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

851 if not res: 851 ↛ 852line 851 didn't jump to line 852 because the condition on line 851 was never true

852 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

853 

854 event, occurrence = res 

855 

856 if not _can_edit_event(session, event, context.user_id): 

857 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

858 

859 if occurrence.is_cancelled: 859 ↛ 860line 859 didn't jump to line 860 because the condition on line 859 was never true

860 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

861 

862 if occurrence.end_time < now() - timedelta(hours=24): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

864 

865 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id] 

866 

867 if len(this_user_reqs) > 0: 

868 context.abort_with_error_code( 

869 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_requested" 

870 ) 

871 

872 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved] 

873 

874 if len(approved_reqs) > 0: 

875 context.abort_with_error_code( 

876 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_approved" 

877 ) 

878 

879 req = EventCommunityInviteRequest( 

880 occurrence_id=request.event_id, 

881 user_id=context.user_id, 

882 ) 

883 session.add(req) 

884 session.flush() 

885 

886 send_event_community_invite_request_email(session, req) 

887 

888 return empty_pb2.Empty() 

889 

890 def ListEventOccurrences( 

891 self, request: events_pb2.ListEventOccurrencesReq, context: CouchersContext, session: Session 

892 ) -> events_pb2.ListEventOccurrencesRes: 

893 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

894 # the page token is a unix timestamp of where we left off 

895 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

896 initial_query = ( 

897 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

898 ) 

899 initial_query = where_moderated_content_visible( 

900 initial_query, context, EventOccurrence, is_list_operation=False 

901 ) 

902 occurrence = session.execute(initial_query).scalar_one_or_none() 

903 if not occurrence: 903 ↛ 904line 903 didn't jump to line 904 because the condition on line 903 was never true

904 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

905 

906 query = ( 

907 select(EventOccurrence) 

908 .where(EventOccurrence.event_id == occurrence.event_id) 

909 .where(~EventOccurrence.is_deleted) 

910 ) 

911 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

912 

913 if not request.include_cancelled: 

914 query = query.where(~EventOccurrence.is_cancelled) 

915 

916 if not request.past: 916 ↛ 920line 916 didn't jump to line 920 because the condition on line 916 was always true

917 cutoff = page_token - timedelta(seconds=1) 

918 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

919 else: 

920 cutoff = page_token + timedelta(seconds=1) 

921 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

922 

923 query = query.limit(page_size + 1) 

924 occurrences = session.execute(query).scalars().all() 

925 

926 return events_pb2.ListEventOccurrencesRes( 

927 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

928 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

929 ) 

930 

931 def ListEventAttendees( 

932 self, request: events_pb2.ListEventAttendeesReq, context: CouchersContext, session: Session 

933 ) -> events_pb2.ListEventAttendeesRes: 

934 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

935 next_user_id = int(request.page_token) if request.page_token else 0 

936 occurrence = session.execute( 

937 where_moderated_content_visible( 

938 select(EventOccurrence) 

939 .where(EventOccurrence.id == request.event_id) 

940 .where(~EventOccurrence.is_deleted), 

941 context, 

942 EventOccurrence, 

943 is_list_operation=False, 

944 ) 

945 ).scalar_one_or_none() 

946 if not occurrence: 946 ↛ 947line 946 didn't jump to line 947 because the condition on line 946 was never true

947 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

948 attendees = ( 

949 session.execute( 

950 where_users_column_visible( 

951 select(EventOccurrenceAttendee) 

952 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

953 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

954 .order_by(EventOccurrenceAttendee.user_id) 

955 .limit(page_size + 1), 

956 context, 

957 EventOccurrenceAttendee.user_id, 

958 ) 

959 ) 

960 .scalars() 

961 .all() 

962 ) 

963 return events_pb2.ListEventAttendeesRes( 

964 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

965 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

966 ) 

967 

968 def ListEventSubscribers( 

969 self, request: events_pb2.ListEventSubscribersReq, context: CouchersContext, session: Session 

970 ) -> events_pb2.ListEventSubscribersRes: 

971 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

972 next_user_id = int(request.page_token) if request.page_token else 0 

973 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

974 if not res: 974 ↛ 975line 974 didn't jump to line 975 because the condition on line 974 was never true

975 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

976 event, occurrence = res 

977 subscribers = ( 

978 session.execute( 

979 where_users_column_visible( 

980 select(EventSubscription) 

981 .where(EventSubscription.event_id == event.id) 

982 .where(EventSubscription.user_id >= next_user_id) 

983 .order_by(EventSubscription.user_id) 

984 .limit(page_size + 1), 

985 context, 

986 EventSubscription.user_id, 

987 ) 

988 ) 

989 .scalars() 

990 .all() 

991 ) 

992 return events_pb2.ListEventSubscribersRes( 

993 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

994 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

995 ) 

996 

997 def ListEventOrganizers( 

998 self, request: events_pb2.ListEventOrganizersReq, context: CouchersContext, session: Session 

999 ) -> events_pb2.ListEventOrganizersRes: 

1000 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1001 next_user_id = int(request.page_token) if request.page_token else 0 

1002 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1003 if not res: 1003 ↛ 1004line 1003 didn't jump to line 1004 because the condition on line 1003 was never true

1004 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1005 event, occurrence = res 

1006 organizers = ( 

1007 session.execute( 

1008 where_users_column_visible( 

1009 select(EventOrganizer) 

1010 .where(EventOrganizer.event_id == event.id) 

1011 .where(EventOrganizer.user_id >= next_user_id) 

1012 .order_by(EventOrganizer.user_id) 

1013 .limit(page_size + 1), 

1014 context, 

1015 EventOrganizer.user_id, 

1016 ) 

1017 ) 

1018 .scalars() 

1019 .all() 

1020 ) 

1021 return events_pb2.ListEventOrganizersRes( 

1022 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

1023 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

1024 ) 

1025 

1026 def TransferEvent( 

1027 self, request: events_pb2.TransferEventReq, context: CouchersContext, session: Session 

1028 ) -> events_pb2.Event: 

1029 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1030 if not res: 1030 ↛ 1031line 1030 didn't jump to line 1031 because the condition on line 1030 was never true

1031 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1032 

1033 event, occurrence = res 

1034 

1035 if not _can_edit_event(session, event, context.user_id): 

1036 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_transfer_permission_denied") 

1037 

1038 if occurrence.is_cancelled: 

1039 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1040 

1041 if occurrence.end_time < now() - timedelta(hours=24): 1041 ↛ 1042line 1041 didn't jump to line 1042 because the condition on line 1041 was never true

1042 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1043 

1044 if request.WhichOneof("new_owner") == "new_owner_group_id": 

1045 cluster = session.execute( 

1046 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

1047 ).scalar_one_or_none() 

1048 elif request.WhichOneof("new_owner") == "new_owner_community_id": 1048 ↛ 1055line 1048 didn't jump to line 1055 because the condition on line 1048 was always true

1049 cluster = session.execute( 

1050 select(Cluster) 

1051 .where(Cluster.parent_node_id == request.new_owner_community_id) 

1052 .where(Cluster.is_official_cluster) 

1053 ).scalar_one_or_none() 

1054 

1055 if not cluster: 1055 ↛ 1056line 1055 didn't jump to line 1056 because the condition on line 1055 was never true

1056 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_or_community_not_found") 

1057 

1058 event.owner_user = None 

1059 event.owner_cluster = cluster 

1060 

1061 session.commit() 

1062 return event_to_pb(session, occurrence, context) 

1063 

1064 def SetEventSubscription( 

1065 self, request: events_pb2.SetEventSubscriptionReq, context: CouchersContext, session: Session 

1066 ) -> events_pb2.Event: 

1067 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1068 if not res: 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true

1069 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1070 

1071 event, occurrence = res 

1072 

1073 if occurrence.is_cancelled: 

1074 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1075 

1076 if occurrence.end_time < now() - timedelta(hours=24): 1076 ↛ 1077line 1076 didn't jump to line 1077 because the condition on line 1076 was never true

1077 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1078 

1079 current_subscription = session.execute( 

1080 select(EventSubscription) 

1081 .where(EventSubscription.user_id == context.user_id) 

1082 .where(EventSubscription.event_id == event.id) 

1083 ).scalar_one_or_none() 

1084 

1085 # if not subscribed, subscribe 

1086 if request.subscribe and not current_subscription: 

1087 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

1088 

1089 # if subscribed but unsubbing, remove subscription 

1090 if not request.subscribe and current_subscription: 

1091 session.delete(current_subscription) 

1092 

1093 session.flush() 

1094 

1095 log_event( 

1096 context, 

1097 session, 

1098 "event.subscription_set", 

1099 {"event_id": event.id, "occurrence_id": occurrence.id, "subscribed": request.subscribe}, 

1100 ) 

1101 

1102 return event_to_pb(session, occurrence, context) 

1103 

1104 def SetEventAttendance( 

1105 self, request: events_pb2.SetEventAttendanceReq, context: CouchersContext, session: Session 

1106 ) -> events_pb2.Event: 

1107 occurrence = session.execute( 

1108 where_moderated_content_visible( 

1109 select(EventOccurrence) 

1110 .where(EventOccurrence.id == request.event_id) 

1111 .where(~EventOccurrence.is_deleted), 

1112 context, 

1113 EventOccurrence, 

1114 is_list_operation=False, 

1115 ) 

1116 ).scalar_one_or_none() 

1117 

1118 if not occurrence: 1118 ↛ 1119line 1118 didn't jump to line 1119 because the condition on line 1118 was never true

1119 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1120 

1121 if occurrence.is_cancelled: 

1122 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1123 

1124 if occurrence.end_time < now() - timedelta(hours=24): 1124 ↛ 1125line 1124 didn't jump to line 1125 because the condition on line 1124 was never true

1125 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1126 

1127 current_attendance = session.execute( 

1128 select(EventOccurrenceAttendee) 

1129 .where(EventOccurrenceAttendee.user_id == context.user_id) 

1130 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1131 ).scalar_one_or_none() 

1132 

1133 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

1134 if current_attendance: 1134 ↛ 1149line 1134 didn't jump to line 1149 because the condition on line 1134 was always true

1135 session.delete(current_attendance) 

1136 # if unset/not going, nothing to do! 

1137 else: 

1138 if current_attendance: 

1139 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] # type: ignore[assignment] 

1140 else: 

1141 # create new 

1142 attendance = EventOccurrenceAttendee( 

1143 user_id=context.user_id, 

1144 occurrence_id=occurrence.id, 

1145 attendee_status=not_none(attendancestate2sql[request.attendance_state]), 

1146 ) 

1147 session.add(attendance) 

1148 

1149 session.flush() 

1150 

1151 log_event( 

1152 context, 

1153 session, 

1154 "event.attendance_set", 

1155 {"occurrence_id": occurrence.id, "attendance_state": request.attendance_state}, 

1156 ) 

1157 

1158 return event_to_pb(session, occurrence, context) 

1159 

1160 def ListMyEvents( 

1161 self, request: events_pb2.ListMyEventsReq, context: CouchersContext, session: Session 

1162 ) -> events_pb2.ListMyEventsRes: 

1163 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1164 # the page token is a unix timestamp of where we left off 

1165 page_token = ( 

1166 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

1167 ) 

1168 # the page number is the page number we are on 

1169 page_number = request.page_number or 1 

1170 # Calculate the offset for pagination 

1171 offset = (page_number - 1) * page_size 

1172 query = ( 

1173 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1174 ) 

1175 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

1176 

1177 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

1178 include_subscribed = request.subscribed or include_all 

1179 include_organizing = request.organizing or include_all 

1180 include_attending = request.attending or include_all 

1181 include_my_communities = request.my_communities or include_all 

1182 

1183 where_ = [] 

1184 

1185 if include_subscribed: 

1186 query = query.outerjoin( 

1187 EventSubscription, 

1188 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

1189 ) 

1190 where_.append(EventSubscription.user_id != None) 

1191 if include_organizing: 

1192 query = query.outerjoin( 

1193 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

1194 ) 

1195 where_.append(EventOrganizer.user_id != None) 

1196 if include_attending: 

1197 query = query.outerjoin( 

1198 EventOccurrenceAttendee, 

1199 and_( 

1200 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

1201 EventOccurrenceAttendee.user_id == context.user_id, 

1202 ), 

1203 ) 

1204 where_.append(EventOccurrenceAttendee.user_id != None) 

1205 if include_my_communities: 

1206 my_communities = ( 

1207 session.execute( 

1208 select(Node.id) 

1209 .join(Cluster, Cluster.parent_node_id == Node.id) 

1210 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

1211 .where(ClusterSubscription.user_id == context.user_id) 

1212 .where(Cluster.is_official_cluster) 

1213 .order_by(Node.id) 

1214 .limit(100000) 

1215 ) 

1216 .scalars() 

1217 .all() 

1218 ) 

1219 where_.append(Event.parent_node_id.in_(my_communities)) 

1220 

1221 query = query.where(or_(*where_)) 

1222 

1223 if request.my_communities_exclude_global: 

1224 query = query.join(Node, Node.id == Event.parent_node_id).where(Node.node_type > NodeType.region) 

1225 

1226 if not request.include_cancelled: 

1227 query = query.where(~EventOccurrence.is_cancelled) 

1228 

1229 if not request.past: 1229 ↛ 1233line 1229 didn't jump to line 1233 because the condition on line 1229 was always true

1230 cutoff = page_token - timedelta(seconds=1) 

1231 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1232 else: 

1233 cutoff = page_token + timedelta(seconds=1) 

1234 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1235 # Count the total number of items for pagination 

1236 total_items = session.execute(select(func.count()).select_from(query.subquery())).scalar() 

1237 # Apply pagination by page number 

1238 query = query.offset(offset).limit(page_size) if request.page_number else query.limit(page_size + 1) 

1239 occurrences = session.execute(query).scalars().all() 

1240 

1241 return events_pb2.ListMyEventsRes( 

1242 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1243 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1244 total_items=total_items, 

1245 ) 

1246 

1247 def ListAllEvents( 

1248 self, request: events_pb2.ListAllEventsReq, context: CouchersContext, session: Session 

1249 ) -> events_pb2.ListAllEventsRes: 

1250 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1251 # the page token is a unix timestamp of where we left off 

1252 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

1253 

1254 query = select(EventOccurrence).where(~EventOccurrence.is_deleted) 

1255 query = where_moderated_content_visible(query, context, EventOccurrence, is_list_operation=True) 

1256 

1257 if not request.include_cancelled: 1257 ↛ 1260line 1257 didn't jump to line 1260 because the condition on line 1257 was always true

1258 query = query.where(~EventOccurrence.is_cancelled) 

1259 

1260 if not request.past: 

1261 cutoff = page_token - timedelta(seconds=1) 

1262 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1263 else: 

1264 cutoff = page_token + timedelta(seconds=1) 

1265 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1266 

1267 query = query.limit(page_size + 1) 

1268 occurrences = session.execute(query).scalars().all() 

1269 

1270 return events_pb2.ListAllEventsRes( 

1271 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1272 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1273 ) 

1274 

1275 def InviteEventOrganizer( 

1276 self, request: events_pb2.InviteEventOrganizerReq, context: CouchersContext, session: Session 

1277 ) -> empty_pb2.Empty: 

1278 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

1279 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1280 if not res: 1280 ↛ 1281line 1280 didn't jump to line 1281 because the condition on line 1280 was never true

1281 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1282 

1283 event, occurrence = res 

1284 

1285 if not _can_edit_event(session, event, context.user_id): 

1286 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

1287 

1288 if occurrence.is_cancelled: 

1289 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1290 

1291 if occurrence.end_time < now() - timedelta(hours=24): 1291 ↛ 1292line 1291 didn't jump to line 1292 because the condition on line 1291 was never true

1292 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1293 

1294 if not session.execute( 1294 ↛ 1297line 1294 didn't jump to line 1297 because the condition on line 1294 was never true

1295 select(User).where(users_visible(context)).where(User.id == request.user_id) 

1296 ).scalar_one_or_none(): 

1297 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1298 

1299 session.add( 

1300 EventOrganizer( 

1301 user_id=request.user_id, 

1302 event_id=event.id, 

1303 ) 

1304 ) 

1305 session.flush() 

1306 

1307 other_user_context = make_background_user_context(user_id=request.user_id) 

1308 

1309 notify( 

1310 session, 

1311 user_id=request.user_id, 

1312 topic_action=NotificationTopicAction.event__invite_organizer, 

1313 key=str(event.id), 

1314 data=notification_data_pb2.EventInviteOrganizer( 

1315 event=event_to_pb(session, occurrence, other_user_context), 

1316 inviting_user=user_model_to_pb(user, session, other_user_context), 

1317 ), 

1318 ) 

1319 

1320 return empty_pb2.Empty() 

1321 

1322 def RemoveEventOrganizer( 

1323 self, request: events_pb2.RemoveEventOrganizerReq, context: CouchersContext, session: Session 

1324 ) -> empty_pb2.Empty: 

1325 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id, context=context) 

1326 if not res: 1326 ↛ 1327line 1326 didn't jump to line 1327 because the condition on line 1326 was never true

1327 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1328 

1329 event, occurrence = res 

1330 

1331 if occurrence.is_cancelled: 1331 ↛ 1332line 1331 didn't jump to line 1332 because the condition on line 1331 was never true

1332 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1333 

1334 if occurrence.end_time < now() - timedelta(hours=24): 1334 ↛ 1335line 1334 didn't jump to line 1335 because the condition on line 1334 was never true

1335 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1336 

1337 # Determine which user to remove 

1338 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id 

1339 

1340 # Check if the target user is the event owner (only after permission check) 

1341 if event.owner_user_id == user_id_to_remove: 

1342 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_remove_owner_as_organizer") 

1343 

1344 # Check permissions: either an organizer removing an organizer OR you're the event owner 

1345 if not _can_edit_event(session, event, context.user_id): 

1346 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_edit_permission_denied") 

1347 

1348 # Find the organizer to remove 

1349 organizer_to_remove = session.execute( 

1350 select(EventOrganizer) 

1351 .where(EventOrganizer.user_id == user_id_to_remove) 

1352 .where(EventOrganizer.event_id == event.id) 

1353 ).scalar_one_or_none() 

1354 

1355 if not organizer_to_remove: 1355 ↛ 1356line 1355 didn't jump to line 1356 because the condition on line 1355 was never true

1356 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_not_an_organizer") 

1357 

1358 session.delete(organizer_to_remove) 

1359 

1360 return empty_pb2.Empty()