Coverage for app / backend / src / couchers / servicers / events.py: 81%

522 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2from datetime import datetime, timedelta 

3from typing import Any, cast 

4 

5import grpc 

6from google.protobuf import empty_pb2 

7from psycopg2.extras import DateTimeTZRange 

8from sqlalchemy import Select, select 

9from sqlalchemy.orm import Session 

10from sqlalchemy.sql import and_, func, or_, update 

11 

12from couchers.constants import GLOBAL_COMMUNITY_MAX_NODE_ID 

13from couchers.context import CouchersContext, make_background_user_context 

14from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

15from couchers.helpers.completed_profile import has_completed_profile 

16from couchers.jobs.enqueue import queue_job 

17from couchers.models import ( 

18 AttendeeStatus, 

19 Cluster, 

20 ClusterSubscription, 

21 Event, 

22 EventCommunityInviteRequest, 

23 EventOccurrence, 

24 EventOccurrenceAttendee, 

25 EventOrganizer, 

26 EventSubscription, 

27 Node, 

28 Thread, 

29 Upload, 

30 User, 

31) 

32from couchers.models.notifications import NotificationTopicAction 

33from couchers.notifications.notify import notify 

34from couchers.proto import events_pb2, events_pb2_grpc, notification_data_pb2 

35from couchers.proto.internal import jobs_pb2 

36from couchers.servicers.api import user_model_to_pb 

37from couchers.servicers.blocking import is_not_visible 

38from couchers.servicers.threads import thread_to_pb 

39from couchers.sql import users_visible, where_users_column_visible 

40from couchers.tasks import send_event_community_invite_request_email 

41from couchers.utils import ( 

42 Timestamp_from_datetime, 

43 create_coordinate, 

44 dt_from_millis, 

45 millis_from_dt, 

46 not_none, 

47 now, 

48 to_aware_datetime, 

49) 

50 

51logger = logging.getLogger(__name__) 

52 

53attendancestate2sql = { 

54 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

55 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

56 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe, 

57} 

58 

59attendancestate2api = { 

60 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

61 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

62 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE, 

63} 

64 

65MAX_PAGINATION_LENGTH = 25 

66 

67 

68def _is_event_owner(event: Event, user_id: int) -> bool: 

69 """ 

70 Checks whether the user can act as an owner of the event 

71 """ 

72 if event.owner_user: 

73 return event.owner_user_id == user_id 

74 # otherwise owned by a cluster 

75 return not_none(event.owner_cluster).admins.where(User.id == user_id).one_or_none() is not None 

76 

77 

78def _is_event_organizer(event: Event, user_id: int) -> bool: 

79 """ 

80 Checks whether the user is as an organizer of the event 

81 """ 

82 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None 

83 

84 

85def _can_moderate_event(session: Session, event: Event, user_id: int) -> bool: 

86 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

87 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

88 return True 

89 

90 # finally check if the user can moderate the parent node of the cluster 

91 return can_moderate_node(session, user_id, event.parent_node_id) 

92 

93 

94def _can_edit_event(session: Session, event: Event, user_id: int) -> bool: 

95 return ( 

96 _is_event_owner(event, user_id) 

97 or _is_event_organizer(event, user_id) 

98 or _can_moderate_event(session, event, user_id) 

99 ) 

100 

101 

102def event_to_pb(session: Session, occurrence: EventOccurrence, context: CouchersContext) -> events_pb2.Event: 

103 event = occurrence.event 

104 

105 next_occurrence = ( 

106 event.occurrences.where(EventOccurrence.end_time >= now()) 

107 .order_by(EventOccurrence.end_time.asc()) 

108 .limit(1) 

109 .one_or_none() 

110 ) 

111 

112 owner_community_id = None 

113 owner_group_id = None 

114 if event.owner_cluster: 

115 if event.owner_cluster.is_official_cluster: 

116 owner_community_id = event.owner_cluster.parent_node_id 

117 else: 

118 owner_group_id = event.owner_cluster.id 

119 

120 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

121 attendance_state = attendance.attendee_status if attendance else None 

122 

123 can_moderate = _can_moderate_event(session, event, context.user_id) 

124 can_edit = _can_edit_event(session, event, context.user_id) 

125 

126 going_count = session.execute( 

127 where_users_column_visible( 

128 select(func.count()) 

129 .select_from(EventOccurrenceAttendee) 

130 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

131 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going), 

132 context, 

133 EventOccurrenceAttendee.user_id, 

134 ) 

135 ).scalar_one() 

136 maybe_count = session.execute( 

137 where_users_column_visible( 

138 select(func.count()) 

139 .select_from(EventOccurrenceAttendee) 

140 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

141 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe), 

142 context, 

143 EventOccurrenceAttendee.user_id, 

144 ) 

145 ).scalar_one() 

146 

147 organizer_count = session.execute( 

148 where_users_column_visible( 

149 select(func.count()).select_from(EventOrganizer).where(EventOrganizer.event_id == event.id), 

150 context, 

151 EventOrganizer.user_id, 

152 ) 

153 ).scalar_one() 

154 subscriber_count = session.execute( 

155 where_users_column_visible( 

156 select(func.count()).select_from(EventSubscription).where(EventSubscription.event_id == event.id), 

157 context, 

158 EventSubscription.user_id, 

159 ) 

160 ).scalar_one() 

161 

162 return events_pb2.Event( 

163 event_id=occurrence.id, 

164 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

165 is_cancelled=occurrence.is_cancelled, 

166 is_deleted=occurrence.is_deleted, 

167 title=event.title, 

168 slug=event.slug, 

169 content=occurrence.content, 

170 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

171 photo_key=occurrence.photo_key or "", 

172 online_information=( 

173 events_pb2.OnlineEventInformation( 

174 link=occurrence.link, 

175 ) 

176 if occurrence.link 

177 else None 

178 ), 

179 offline_information=( 

180 events_pb2.OfflineEventInformation( 

181 lat=not_none(occurrence.coordinates)[0], 

182 lng=not_none(occurrence.coordinates)[1], 

183 address=occurrence.address, 

184 ) 

185 if occurrence.geom 

186 else None 

187 ), 

188 created=Timestamp_from_datetime(occurrence.created), 

189 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

190 creator_user_id=occurrence.creator_user_id, 

191 start_time=Timestamp_from_datetime(occurrence.start_time), 

192 end_time=Timestamp_from_datetime(occurrence.end_time), 

193 timezone=occurrence.timezone, 

194 start_time_display=str(occurrence.start_time), 

195 end_time_display=str(occurrence.end_time), 

196 attendance_state=attendancestate2api[attendance_state], 

197 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

198 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

199 going_count=going_count, 

200 maybe_count=maybe_count, 

201 organizer_count=organizer_count, 

202 subscriber_count=subscriber_count, 

203 owner_user_id=event.owner_user_id, 

204 owner_community_id=owner_community_id, 

205 owner_group_id=owner_group_id, 

206 thread=thread_to_pb(session, event.thread_id), 

207 can_edit=can_edit, 

208 can_moderate=can_moderate, 

209 ) 

210 

211 

212def _get_event_and_occurrence_query(occurrence_id: int, include_deleted: bool) -> Select[tuple[Event, EventOccurrence]]: 

213 query = ( 

214 select(Event, EventOccurrence) 

215 .where(EventOccurrence.id == occurrence_id) 

216 .where(EventOccurrence.event_id == Event.id) 

217 ) 

218 

219 if not include_deleted: 219 ↛ 222line 219 didn't jump to line 222 because the condition on line 219 was always true

220 query = query.where(~EventOccurrence.is_deleted) 

221 

222 return query 

223 

224 

225def _get_event_and_occurrence_one( 

226 session: Session, occurrence_id: int, include_deleted: bool = False 

227) -> tuple[Event, EventOccurrence]: 

228 result = session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one() 

229 return result._tuple() 

230 

231 

232def _get_event_and_occurrence_one_or_none( 

233 session: Session, occurrence_id: int, include_deleted: bool = False 

234) -> tuple[Event, EventOccurrence] | None: 

235 result = session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none() 

236 return result._tuple() if result else None 

237 

238 

239def _check_occurrence_time_validity(start_time: datetime, end_time: datetime, context: CouchersContext) -> None: 

240 if start_time < now(): 

241 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_in_past") 

242 if end_time < start_time: 

243 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_ends_before_starts") 

244 if end_time - start_time > timedelta(days=7): 

245 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_long") 

246 if start_time - now() > timedelta(days=365): 

247 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_far_in_future") 

248 

249 

250def get_users_to_notify_for_new_event(session: Session, occurrence: EventOccurrence) -> tuple[list[User], int | None]: 

251 """ 

252 Returns the users to notify, as well as the community id that is being notified (None if based on geo search) 

253 """ 

254 cluster = occurrence.event.parent_node.official_cluster 

255 if cluster.parent_node_id <= GLOBAL_COMMUNITY_MAX_NODE_ID: 

256 logger.info("The Global Community is too big for email notifications.") 

257 return [], occurrence.event.parent_node_id 

258 elif occurrence.creator_user in cluster.admins or cluster.is_leaf: 258 ↛ 261line 258 didn't jump to line 261 because the condition on line 258 was always true

259 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id 

260 else: 

261 max_radius = 20000 # m 

262 users = ( 

263 session.execute( 

264 select(User) 

265 .join(ClusterSubscription, ClusterSubscription.user_id == User.id) 

266 .where(User.is_visible) 

267 .where(ClusterSubscription.cluster_id == cluster.id) 

268 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111)) 

269 ) 

270 .scalars() 

271 .all() 

272 ) 

273 return cast(tuple[list[User], int | None], (users, None)) 

274 

275 

276def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload) -> None: 

277 """ 

278 Background job to generated/fan out event notifications 

279 """ 

280 from couchers.servicers.communities import community_to_pb 

281 

282 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}") 

283 

284 with session_scope() as session: 

285 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

286 creator = occurrence.creator_user 

287 

288 users, node_id = get_users_to_notify_for_new_event(session, occurrence) 

289 

290 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none() 

291 

292 if not inviting_user: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?") 

294 return 

295 

296 for user in users: 

297 if is_not_visible(session, user.id, creator.id): 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 continue 

299 context = make_background_user_context(user_id=user.id) 

300 topic_action = ( 

301 NotificationTopicAction.event__create_approved 

302 if payload.approved 

303 else NotificationTopicAction.event__create_any 

304 ) 

305 notify( 

306 session, 

307 user_id=user.id, 

308 topic_action=topic_action, 

309 key=str(payload.occurrence_id), 

310 data=notification_data_pb2.EventCreate( 

311 event=event_to_pb(session, occurrence, context), 

312 inviting_user=user_model_to_pb(inviting_user, session, context), 

313 nearby=True if node_id is None else None, 

314 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None, 

315 ), 

316 ) 

317 

318 

319def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload) -> None: 

320 with session_scope() as session: 

321 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

322 

323 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one() 

324 

325 subscribed_user_ids = [user.id for user in event.subscribers] 

326 attending_user_ids = [user.user_id for user in occurrence.attendances] 

327 

328 for user_id in set(subscribed_user_ids + attending_user_ids): 

329 if is_not_visible(session, user_id, updating_user.id): 

330 continue 

331 context = make_background_user_context(user_id=user_id) 

332 notify( 

333 session, 

334 user_id=user_id, 

335 topic_action=NotificationTopicAction.event__update, 

336 key=str(payload.occurrence_id), 

337 data=notification_data_pb2.EventUpdate( 

338 event=event_to_pb(session, occurrence, context), 

339 updating_user=user_model_to_pb(updating_user, session, context), 

340 updated_items=payload.updated_items, 

341 ), 

342 ) 

343 

344 

345def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload) -> None: 

346 with session_scope() as session: 

347 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id) 

348 

349 cancelling_user = session.execute(select(User).where(User.id == payload.cancelling_user_id)).scalar_one() 

350 

351 subscribed_user_ids = [user.id for user in event.subscribers] 

352 attending_user_ids = [user.user_id for user in occurrence.attendances] 

353 

354 for user_id in set(subscribed_user_ids + attending_user_ids): 

355 if is_not_visible(session, user_id, cancelling_user.id): 

356 continue 

357 context = make_background_user_context(user_id=user_id) 

358 notify( 

359 session, 

360 user_id=user_id, 

361 topic_action=NotificationTopicAction.event__cancel, 

362 key=str(payload.occurrence_id), 

363 data=notification_data_pb2.EventCancel( 

364 event=event_to_pb(session, occurrence, context), 

365 cancelling_user=user_model_to_pb(cancelling_user, session, context), 

366 ), 

367 ) 

368 

369 

370def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload) -> None: 

371 with session_scope() as session: 

372 event, occurrence = _get_event_and_occurrence_one( 

373 session, occurrence_id=payload.occurrence_id, include_deleted=True 

374 ) 

375 

376 subscribed_user_ids = [user.id for user in event.subscribers] 

377 attending_user_ids = [user.user_id for user in occurrence.attendances] 

378 

379 for user_id in set(subscribed_user_ids + attending_user_ids): 

380 context = make_background_user_context(user_id=user_id) 

381 notify( 

382 session, 

383 user_id=user_id, 

384 topic_action=NotificationTopicAction.event__delete, 

385 key=str(payload.occurrence_id), 

386 data=notification_data_pb2.EventDelete( 

387 event=event_to_pb(session, occurrence, context), 

388 ), 

389 ) 

390 

391 

392class Events(events_pb2_grpc.EventsServicer): 

393 def CreateEvent( 

394 self, request: events_pb2.CreateEventReq, context: CouchersContext, session: Session 

395 ) -> events_pb2.Event: 

396 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

397 if not has_completed_profile(session, user): 

398 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_create_event") 

399 if not request.title: 

400 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_title") 

401 if not request.content: 

402 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

403 if request.HasField("online_information"): 

404 online = True 

405 geom = None 

406 address = None 

407 if not request.online_information.link: 

408 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

409 link = request.online_information.link 

410 elif request.HasField("offline_information"): 410 ↛ 425line 410 didn't jump to line 425 because the condition on line 410 was always true

411 online = False 

412 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value 

413 if not ( 

414 request.offline_information.address 

415 and request.offline_information.lat 

416 and request.offline_information.lng 

417 ): 

418 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

419 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

421 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

422 address = request.offline_information.address 

423 link = None 

424 else: 

425 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

426 

427 start_time = to_aware_datetime(request.start_time) 

428 end_time = to_aware_datetime(request.end_time) 

429 

430 _check_occurrence_time_validity(start_time, end_time, context) 

431 

432 if request.parent_community_id: 

433 parent_node = session.execute( 

434 select(Node).where(Node.id == request.parent_community_id) 

435 ).scalar_one_or_none() 

436 

437 if not parent_node or not parent_node.official_cluster.events_enabled: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled") 

439 else: 

440 if online: 

441 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_missing_parent_community") 

442 # parent community computed from geom 

443 parent_node = get_parent_node_at_location(session, not_none(geom)) 

444 

445 if not parent_node: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true

446 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "community_not_found") 

447 

448 if ( 

449 request.photo_key 

450 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

451 ): 

452 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

453 

454 thread = Thread() 

455 session.add(thread) 

456 session.flush() 

457 

458 event = Event( 

459 title=request.title, 

460 parent_node_id=parent_node.id, 

461 owner_user_id=context.user_id, 

462 thread_id=thread.id, 

463 creator_user_id=context.user_id, 

464 ) 

465 session.add(event) 

466 session.flush() 

467 

468 occurrence = EventOccurrence( 

469 event_id=event.id, 

470 content=request.content, 

471 geom=geom, 

472 address=address, 

473 link=link, 

474 photo_key=request.photo_key if request.photo_key != "" else None, 

475 # timezone=timezone, 

476 during=DateTimeTZRange(start_time, end_time), 

477 creator_user_id=context.user_id, 

478 ) 

479 session.add(occurrence) 

480 session.flush() 

481 

482 session.add( 

483 EventOrganizer( 

484 user_id=context.user_id, 

485 event_id=event.id, 

486 ) 

487 ) 

488 

489 session.add( 

490 EventSubscription( 

491 user_id=context.user_id, 

492 event_id=event.id, 

493 ) 

494 ) 

495 

496 session.add( 

497 EventOccurrenceAttendee( 

498 user_id=context.user_id, 

499 occurrence_id=occurrence.id, 

500 attendee_status=AttendeeStatus.going, 

501 ) 

502 ) 

503 

504 session.commit() 

505 

506 if has_completed_profile(session, user): 506 ↛ 517line 506 didn't jump to line 517 because the condition on line 506 was always true

507 queue_job( 

508 session, 

509 job=generate_event_create_notifications, 

510 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

511 inviting_user_id=user.id, 

512 occurrence_id=occurrence.id, 

513 approved=False, 

514 ), 

515 ) 

516 

517 return event_to_pb(session, occurrence, context) 

518 

519 def ScheduleEvent( 

520 self, request: events_pb2.ScheduleEventReq, context: CouchersContext, session: Session 

521 ) -> events_pb2.Event: 

522 if not request.content: 522 ↛ 523line 522 didn't jump to line 523 because the condition on line 522 was never true

523 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content") 

524 if request.HasField("online_information"): 

525 geom = None 

526 address = None 

527 link = request.online_information.link 

528 elif request.HasField("offline_information"): 528 ↛ 541line 528 didn't jump to line 541 because the condition on line 528 was always true

529 if not ( 529 ↛ 534line 529 didn't jump to line 534 because the condition on line 529 was never true

530 request.offline_information.address 

531 and request.offline_information.lat 

532 and request.offline_information.lng 

533 ): 

534 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location") 

535 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true

536 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

537 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

538 address = request.offline_information.address 

539 link = None 

540 else: 

541 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link") 

542 

543 start_time = to_aware_datetime(request.start_time) 

544 end_time = to_aware_datetime(request.end_time) 

545 

546 _check_occurrence_time_validity(start_time, end_time, context) 

547 

548 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

549 if not res: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

551 

552 event, occurrence = res 

553 

554 if not _can_edit_event(session, event, context.user_id): 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true

555 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

556 

557 if occurrence.is_cancelled: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

559 

560 if ( 560 ↛ 564line 560 didn't jump to line 564 because the condition on line 560 was never true

561 request.photo_key 

562 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

563 ): 

564 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found") 

565 

566 during = DateTimeTZRange(start_time, end_time) 

567 

568 # && is the overlap operator for ranges 

569 if ( 

570 session.execute( 

571 select(EventOccurrence.id) 

572 .where(EventOccurrence.event_id == event.id) 

573 .where(EventOccurrence.during.op("&&")(during)) 

574 .limit(1) 

575 ) 

576 .scalars() 

577 .one_or_none() 

578 is not None 

579 ): 

580 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

581 

582 occurrence = EventOccurrence( 

583 event_id=event.id, 

584 content=request.content, 

585 geom=geom, 

586 address=address, 

587 link=link, 

588 photo_key=request.photo_key if request.photo_key != "" else None, 

589 # timezone=timezone, 

590 during=during, 

591 creator_user_id=context.user_id, 

592 ) 

593 session.add(occurrence) 

594 session.flush() 

595 

596 session.add( 

597 EventOccurrenceAttendee( 

598 user_id=context.user_id, 

599 occurrence_id=occurrence.id, 

600 attendee_status=AttendeeStatus.going, 

601 ) 

602 ) 

603 

604 session.flush() 

605 

606 # TODO: notify 

607 

608 return event_to_pb(session, occurrence, context) 

609 

610 def UpdateEvent( 

611 self, request: events_pb2.UpdateEventReq, context: CouchersContext, session: Session 

612 ) -> events_pb2.Event: 

613 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

614 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

615 if not res: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true

616 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

617 

618 event, occurrence = res 

619 

620 if not _can_edit_event(session, event, context.user_id): 620 ↛ 621line 620 didn't jump to line 621 because the condition on line 620 was never true

621 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

622 

623 # the things that were updated and need to be notified about 

624 notify_updated = [] 

625 

626 if occurrence.is_cancelled: 

627 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

628 

629 occurrence_update: dict[str, Any] = {"last_edited": now()} 

630 

631 if request.HasField("title"): 

632 notify_updated.append("title") 

633 event.title = request.title.value 

634 

635 if request.HasField("content"): 

636 notify_updated.append("content") 

637 occurrence_update["content"] = request.content.value 

638 

639 if request.HasField("photo_key"): 639 ↛ 640line 639 didn't jump to line 640 because the condition on line 639 was never true

640 occurrence_update["photo_key"] = request.photo_key.value 

641 

642 if request.HasField("online_information"): 

643 notify_updated.append("location") 

644 if not request.online_information.link: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true

645 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link") 

646 occurrence_update["link"] = request.online_information.link 

647 occurrence_update["geom"] = None 

648 occurrence_update["address"] = None 

649 elif request.HasField("offline_information"): 

650 notify_updated.append("location") 

651 occurrence_update["link"] = None 

652 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 652 ↛ 653line 652 didn't jump to line 653 because the condition on line 652 was never true

653 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate") 

654 occurrence_update["geom"] = create_coordinate( 

655 request.offline_information.lat, request.offline_information.lng 

656 ) 

657 occurrence_update["address"] = request.offline_information.address 

658 

659 if request.HasField("start_time") or request.HasField("end_time"): 

660 if request.update_all_future: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true

661 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_cant_update_all_times") 

662 if request.HasField("start_time"): 662 ↛ 666line 662 didn't jump to line 666 because the condition on line 662 was always true

663 notify_updated.append("start time") 

664 start_time = to_aware_datetime(request.start_time) 

665 else: 

666 start_time = occurrence.start_time 

667 if request.HasField("end_time"): 

668 notify_updated.append("end time") 

669 end_time = to_aware_datetime(request.end_time) 

670 else: 

671 end_time = occurrence.end_time 

672 

673 _check_occurrence_time_validity(start_time, end_time, context) 

674 

675 during = DateTimeTZRange(start_time, end_time) 

676 

677 # && is the overlap operator for ranges 

678 if ( 

679 session.execute( 

680 select(EventOccurrence.id) 

681 .where(EventOccurrence.event_id == event.id) 

682 .where(EventOccurrence.id != occurrence.id) 

683 .where(EventOccurrence.during.op("&&")(during)) 

684 .limit(1) 

685 ) 

686 .scalars() 

687 .one_or_none() 

688 is not None 

689 ): 

690 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap") 

691 

692 occurrence_update["during"] = during 

693 

694 # TODO 

695 # if request.HasField("timezone"): 

696 # occurrence_update["timezone"] = request.timezone 

697 

698 # allow editing any event which hasn't ended more than 24 hours before now 

699 # when editing all future events, we edit all which have not yet ended 

700 

701 cutoff_time = now() - timedelta(hours=24) 

702 if request.update_all_future: 

703 session.execute( 

704 update(EventOccurrence) 

705 .where(EventOccurrence.end_time >= cutoff_time) 

706 .where(EventOccurrence.start_time >= occurrence.start_time) 

707 .values(occurrence_update) 

708 .execution_options(synchronize_session=False) 

709 ) 

710 else: 

711 if occurrence.end_time < cutoff_time: 711 ↛ 712line 711 didn't jump to line 712 because the condition on line 711 was never true

712 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

713 session.execute( 

714 update(EventOccurrence) 

715 .where(EventOccurrence.end_time >= cutoff_time) 

716 .where(EventOccurrence.id == occurrence.id) 

717 .values(occurrence_update) 

718 .execution_options(synchronize_session=False) 

719 ) 

720 

721 session.flush() 

722 

723 if notify_updated: 

724 if request.should_notify: 

725 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying") 

726 

727 queue_job( 

728 session, 

729 job=generate_event_update_notifications, 

730 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload( 

731 updating_user_id=user.id, 

732 occurrence_id=occurrence.id, 

733 updated_items=notify_updated, 

734 ), 

735 ) 

736 else: 

737 logger.info( 

738 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications" 

739 ) 

740 

741 # since we have synchronize_session=False, we have to refresh the object 

742 session.refresh(occurrence) 

743 

744 return event_to_pb(session, occurrence, context) 

745 

746 def GetEvent(self, request: events_pb2.GetEventReq, context: CouchersContext, session: Session) -> events_pb2.Event: 

747 occurrence = session.execute( 

748 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

749 ).scalar_one_or_none() 

750 

751 if not occurrence: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true

752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

753 

754 return event_to_pb(session, occurrence, context) 

755 

756 def CancelEvent( 

757 self, request: events_pb2.CancelEventReq, context: CouchersContext, session: Session 

758 ) -> empty_pb2.Empty: 

759 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

760 if not res: 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true

761 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

762 

763 event, occurrence = res 

764 

765 if not _can_edit_event(session, event, context.user_id): 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true

766 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

767 

768 if occurrence.end_time < now() - timedelta(hours=24): 768 ↛ 769line 768 didn't jump to line 769 because the condition on line 768 was never true

769 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_cancel_old_event") 

770 

771 occurrence.is_cancelled = True 

772 

773 queue_job( 

774 session, 

775 job=generate_event_cancel_notifications, 

776 payload=jobs_pb2.GenerateEventCancelNotificationsPayload( 

777 cancelling_user_id=context.user_id, 

778 occurrence_id=occurrence.id, 

779 ), 

780 ) 

781 

782 return empty_pb2.Empty() 

783 

784 def RequestCommunityInvite( 

785 self, request: events_pb2.RequestCommunityInviteReq, context: CouchersContext, session: Session 

786 ) -> empty_pb2.Empty: 

787 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

788 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

789 if not res: 789 ↛ 790line 789 didn't jump to line 790 because the condition on line 789 was never true

790 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

791 

792 event, occurrence = res 

793 

794 if not _can_edit_event(session, event, context.user_id): 

795 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

796 

797 if occurrence.is_cancelled: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true

798 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

799 

800 if occurrence.end_time < now() - timedelta(hours=24): 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true

801 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

802 

803 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id] 

804 

805 if len(this_user_reqs) > 0: 

806 context.abort_with_error_code( 

807 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_requested" 

808 ) 

809 

810 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved] 

811 

812 if len(approved_reqs) > 0: 

813 context.abort_with_error_code( 

814 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_approved" 

815 ) 

816 

817 req = EventCommunityInviteRequest( 

818 occurrence_id=request.event_id, 

819 user_id=context.user_id, 

820 ) 

821 session.add(req) 

822 session.flush() 

823 

824 send_event_community_invite_request_email(session, req) 

825 

826 return empty_pb2.Empty() 

827 

828 def ListEventOccurrences( 

829 self, request: events_pb2.ListEventOccurrencesReq, context: CouchersContext, session: Session 

830 ) -> events_pb2.ListEventOccurrencesRes: 

831 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

832 # the page token is a unix timestamp of where we left off 

833 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

834 occurrence = session.execute( 

835 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

836 ).scalar_one_or_none() 

837 if not occurrence: 837 ↛ 838line 837 didn't jump to line 838 because the condition on line 837 was never true

838 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

839 

840 query = select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted) 

841 

842 if not request.include_cancelled: 

843 query = query.where(~EventOccurrence.is_cancelled) 

844 

845 if not request.past: 845 ↛ 849line 845 didn't jump to line 849 because the condition on line 845 was always true

846 cutoff = page_token - timedelta(seconds=1) 

847 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

848 else: 

849 cutoff = page_token + timedelta(seconds=1) 

850 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

851 

852 query = query.limit(page_size + 1) 

853 occurrences = session.execute(query).scalars().all() 

854 

855 return events_pb2.ListEventOccurrencesRes( 

856 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

857 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

858 ) 

859 

860 def ListEventAttendees( 

861 self, request: events_pb2.ListEventAttendeesReq, context: CouchersContext, session: Session 

862 ) -> events_pb2.ListEventAttendeesRes: 

863 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

864 next_user_id = int(request.page_token) if request.page_token else 0 

865 occurrence = session.execute( 

866 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

867 ).scalar_one_or_none() 

868 if not occurrence: 868 ↛ 869line 868 didn't jump to line 869 because the condition on line 868 was never true

869 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

870 attendees = ( 

871 session.execute( 

872 where_users_column_visible( 

873 select(EventOccurrenceAttendee) 

874 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

875 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

876 .order_by(EventOccurrenceAttendee.user_id) 

877 .limit(page_size + 1), 

878 context, 

879 EventOccurrenceAttendee.user_id, 

880 ) 

881 ) 

882 .scalars() 

883 .all() 

884 ) 

885 return events_pb2.ListEventAttendeesRes( 

886 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

887 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

888 ) 

889 

890 def ListEventSubscribers( 

891 self, request: events_pb2.ListEventSubscribersReq, context: CouchersContext, session: Session 

892 ) -> events_pb2.ListEventSubscribersRes: 

893 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

894 next_user_id = int(request.page_token) if request.page_token else 0 

895 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

896 if not res: 896 ↛ 897line 896 didn't jump to line 897 because the condition on line 896 was never true

897 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

898 event, occurrence = res 

899 subscribers = ( 

900 session.execute( 

901 where_users_column_visible( 

902 select(EventSubscription) 

903 .where(EventSubscription.event_id == event.id) 

904 .where(EventSubscription.user_id >= next_user_id) 

905 .order_by(EventSubscription.user_id) 

906 .limit(page_size + 1), 

907 context, 

908 EventSubscription.user_id, 

909 ) 

910 ) 

911 .scalars() 

912 .all() 

913 ) 

914 return events_pb2.ListEventSubscribersRes( 

915 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

916 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

917 ) 

918 

919 def ListEventOrganizers( 

920 self, request: events_pb2.ListEventOrganizersReq, context: CouchersContext, session: Session 

921 ) -> events_pb2.ListEventOrganizersRes: 

922 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

923 next_user_id = int(request.page_token) if request.page_token else 0 

924 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

925 if not res: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

927 event, occurrence = res 

928 organizers = ( 

929 session.execute( 

930 where_users_column_visible( 

931 select(EventOrganizer) 

932 .where(EventOrganizer.event_id == event.id) 

933 .where(EventOrganizer.user_id >= next_user_id) 

934 .order_by(EventOrganizer.user_id) 

935 .limit(page_size + 1), 

936 context, 

937 EventOrganizer.user_id, 

938 ) 

939 ) 

940 .scalars() 

941 .all() 

942 ) 

943 return events_pb2.ListEventOrganizersRes( 

944 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

945 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

946 ) 

947 

948 def TransferEvent( 

949 self, request: events_pb2.TransferEventReq, context: CouchersContext, session: Session 

950 ) -> events_pb2.Event: 

951 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

952 if not res: 952 ↛ 953line 952 didn't jump to line 953 because the condition on line 952 was never true

953 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

954 

955 event, occurrence = res 

956 

957 if not _can_edit_event(session, event, context.user_id): 

958 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_transfer_permission_denied") 

959 

960 if occurrence.is_cancelled: 

961 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

962 

963 if occurrence.end_time < now() - timedelta(hours=24): 963 ↛ 964line 963 didn't jump to line 964 because the condition on line 963 was never true

964 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

965 

966 if request.WhichOneof("new_owner") == "new_owner_group_id": 

967 cluster = session.execute( 

968 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

969 ).scalar_one_or_none() 

970 elif request.WhichOneof("new_owner") == "new_owner_community_id": 970 ↛ 977line 970 didn't jump to line 977 because the condition on line 970 was always true

971 cluster = session.execute( 

972 select(Cluster) 

973 .where(Cluster.parent_node_id == request.new_owner_community_id) 

974 .where(Cluster.is_official_cluster) 

975 ).scalar_one_or_none() 

976 

977 if not cluster: 977 ↛ 978line 977 didn't jump to line 978 because the condition on line 977 was never true

978 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_or_community_not_found") 

979 

980 event.owner_user = None 

981 event.owner_cluster = cluster 

982 

983 session.commit() 

984 return event_to_pb(session, occurrence, context) 

985 

986 def SetEventSubscription( 

987 self, request: events_pb2.SetEventSubscriptionReq, context: CouchersContext, session: Session 

988 ) -> events_pb2.Event: 

989 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

990 if not res: 990 ↛ 991line 990 didn't jump to line 991 because the condition on line 990 was never true

991 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

992 

993 event, occurrence = res 

994 

995 if occurrence.is_cancelled: 

996 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

997 

998 if occurrence.end_time < now() - timedelta(hours=24): 998 ↛ 999line 998 didn't jump to line 999 because the condition on line 998 was never true

999 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1000 

1001 current_subscription = session.execute( 

1002 select(EventSubscription) 

1003 .where(EventSubscription.user_id == context.user_id) 

1004 .where(EventSubscription.event_id == event.id) 

1005 ).scalar_one_or_none() 

1006 

1007 # if not subscribed, subscribe 

1008 if request.subscribe and not current_subscription: 

1009 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

1010 

1011 # if subscribed but unsubbing, remove subscription 

1012 if not request.subscribe and current_subscription: 

1013 session.delete(current_subscription) 

1014 

1015 session.flush() 

1016 

1017 return event_to_pb(session, occurrence, context) 

1018 

1019 def SetEventAttendance( 

1020 self, request: events_pb2.SetEventAttendanceReq, context: CouchersContext, session: Session 

1021 ) -> events_pb2.Event: 

1022 occurrence = session.execute( 

1023 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted) 

1024 ).scalar_one_or_none() 

1025 

1026 if not occurrence: 1026 ↛ 1027line 1026 didn't jump to line 1027 because the condition on line 1026 was never true

1027 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1028 

1029 if occurrence.is_cancelled: 

1030 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1031 

1032 if occurrence.end_time < now() - timedelta(hours=24): 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true

1033 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1034 

1035 current_attendance = session.execute( 

1036 select(EventOccurrenceAttendee) 

1037 .where(EventOccurrenceAttendee.user_id == context.user_id) 

1038 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

1039 ).scalar_one_or_none() 

1040 

1041 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

1042 if current_attendance: 1042 ↛ 1057line 1042 didn't jump to line 1057 because the condition on line 1042 was always true

1043 session.delete(current_attendance) 

1044 # if unset/not going, nothing to do! 

1045 else: 

1046 if current_attendance: 

1047 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] # type: ignore[assignment] 

1048 else: 

1049 # create new 

1050 attendance = EventOccurrenceAttendee( 

1051 user_id=context.user_id, 

1052 occurrence_id=occurrence.id, 

1053 attendee_status=not_none(attendancestate2sql[request.attendance_state]), 

1054 ) 

1055 session.add(attendance) 

1056 

1057 session.flush() 

1058 

1059 return event_to_pb(session, occurrence, context) 

1060 

1061 def ListMyEvents( 

1062 self, request: events_pb2.ListMyEventsReq, context: CouchersContext, session: Session 

1063 ) -> events_pb2.ListMyEventsRes: 

1064 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1065 # the page token is a unix timestamp of where we left off 

1066 page_token = ( 

1067 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now() 

1068 ) 

1069 # the page number is the page number we are on 

1070 page_number = request.page_number or 1 

1071 # Calculate the offset for pagination 

1072 offset = (page_number - 1) * page_size 

1073 query = ( 

1074 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1075 ) 

1076 

1077 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

1078 include_subscribed = request.subscribed or include_all 

1079 include_organizing = request.organizing or include_all 

1080 include_attending = request.attending or include_all 

1081 include_my_communities = request.my_communities or include_all 

1082 

1083 where_ = [] 

1084 

1085 if include_subscribed: 

1086 query = query.outerjoin( 

1087 EventSubscription, 

1088 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

1089 ) 

1090 where_.append(EventSubscription.user_id != None) 

1091 if include_organizing: 

1092 query = query.outerjoin( 

1093 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

1094 ) 

1095 where_.append(EventOrganizer.user_id != None) 

1096 if include_attending: 

1097 query = query.outerjoin( 

1098 EventOccurrenceAttendee, 

1099 and_( 

1100 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

1101 EventOccurrenceAttendee.user_id == context.user_id, 

1102 ), 

1103 ) 

1104 where_.append(EventOccurrenceAttendee.user_id != None) 

1105 if include_my_communities: 

1106 my_communities = ( 

1107 session.execute( 

1108 select(Node.id) 

1109 .join(Cluster, Cluster.parent_node_id == Node.id) 

1110 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

1111 .where(ClusterSubscription.user_id == context.user_id) 

1112 .where(Cluster.is_official_cluster) 

1113 .order_by(Node.id) 

1114 .limit(100000) 

1115 ) 

1116 .scalars() 

1117 .all() 

1118 ) 

1119 where_.append(Event.parent_node_id.in_(my_communities)) 

1120 

1121 query = query.where(or_(*where_)) 

1122 

1123 if request.my_communities_exclude_global: 

1124 query = query.where(Event.parent_node_id > GLOBAL_COMMUNITY_MAX_NODE_ID) 

1125 

1126 if not request.include_cancelled: 

1127 query = query.where(~EventOccurrence.is_cancelled) 

1128 

1129 if not request.past: 1129 ↛ 1133line 1129 didn't jump to line 1133 because the condition on line 1129 was always true

1130 cutoff = page_token - timedelta(seconds=1) 

1131 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1132 else: 

1133 cutoff = page_token + timedelta(seconds=1) 

1134 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1135 # Count the total number of items for pagination 

1136 total_items = session.execute(select(func.count()).select_from(query.subquery())).scalar() 

1137 # Apply pagination by page number 

1138 query = query.offset(offset).limit(page_size) if request.page_number else query.limit(page_size + 1) 

1139 occurrences = session.execute(query).scalars().all() 

1140 

1141 return events_pb2.ListMyEventsRes( 

1142 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1143 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1144 total_items=total_items, 

1145 ) 

1146 

1147 def ListAllEvents( 

1148 self, request: events_pb2.ListAllEventsReq, context: CouchersContext, session: Session 

1149 ) -> events_pb2.ListAllEventsRes: 

1150 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

1151 # the page token is a unix timestamp of where we left off 

1152 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

1153 

1154 query = ( 

1155 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted) 

1156 ) 

1157 

1158 if not request.include_cancelled: 1158 ↛ 1161line 1158 didn't jump to line 1161 because the condition on line 1158 was always true

1159 query = query.where(~EventOccurrence.is_cancelled) 

1160 

1161 if not request.past: 

1162 cutoff = page_token - timedelta(seconds=1) 

1163 query = query.where(EventOccurrence.end_time > cutoff).order_by(EventOccurrence.start_time.asc()) 

1164 else: 

1165 cutoff = page_token + timedelta(seconds=1) 

1166 query = query.where(EventOccurrence.end_time < cutoff).order_by(EventOccurrence.start_time.desc()) 

1167 

1168 query = query.limit(page_size + 1) 

1169 occurrences = session.execute(query).scalars().all() 

1170 

1171 return events_pb2.ListAllEventsRes( 

1172 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

1173 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

1174 ) 

1175 

1176 def InviteEventOrganizer( 

1177 self, request: events_pb2.InviteEventOrganizerReq, context: CouchersContext, session: Session 

1178 ) -> empty_pb2.Empty: 

1179 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

1180 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1181 if not res: 1181 ↛ 1182line 1181 didn't jump to line 1182 because the condition on line 1181 was never true

1182 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1183 

1184 event, occurrence = res 

1185 

1186 if not _can_edit_event(session, event, context.user_id): 

1187 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied") 

1188 

1189 if occurrence.is_cancelled: 

1190 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1191 

1192 if occurrence.end_time < now() - timedelta(hours=24): 1192 ↛ 1193line 1192 didn't jump to line 1193 because the condition on line 1192 was never true

1193 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1194 

1195 if not session.execute( 1195 ↛ 1198line 1195 didn't jump to line 1198 because the condition on line 1195 was never true

1196 select(User).where(users_visible(context)).where(User.id == request.user_id) 

1197 ).scalar_one_or_none(): 

1198 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1199 

1200 session.add( 

1201 EventOrganizer( 

1202 user_id=request.user_id, 

1203 event_id=event.id, 

1204 ) 

1205 ) 

1206 session.flush() 

1207 

1208 other_user_context = make_background_user_context(user_id=request.user_id) 

1209 

1210 notify( 

1211 session, 

1212 user_id=request.user_id, 

1213 topic_action=NotificationTopicAction.event__invite_organizer, 

1214 key=str(event.id), 

1215 data=notification_data_pb2.EventInviteOrganizer( 

1216 event=event_to_pb(session, occurrence, other_user_context), 

1217 inviting_user=user_model_to_pb(user, session, other_user_context), 

1218 ), 

1219 ) 

1220 

1221 return empty_pb2.Empty() 

1222 

1223 def RemoveEventOrganizer( 

1224 self, request: events_pb2.RemoveEventOrganizerReq, context: CouchersContext, session: Session 

1225 ) -> empty_pb2.Empty: 

1226 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id) 

1227 if not res: 1227 ↛ 1228line 1227 didn't jump to line 1228 because the condition on line 1227 was never true

1228 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

1229 

1230 event, occurrence = res 

1231 

1232 if occurrence.is_cancelled: 1232 ↛ 1233line 1232 didn't jump to line 1233 because the condition on line 1232 was never true

1233 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event") 

1234 

1235 if occurrence.end_time < now() - timedelta(hours=24): 1235 ↛ 1236line 1235 didn't jump to line 1236 because the condition on line 1235 was never true

1236 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event") 

1237 

1238 # Determine which user to remove 

1239 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id 

1240 

1241 # Check if the target user is the event owner (only after permission check) 

1242 if event.owner_user_id == user_id_to_remove: 

1243 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_remove_owner_as_organizer") 

1244 

1245 # Check permissions: either an organizer removing an organizer OR you're the event owner 

1246 if not _can_edit_event(session, event, context.user_id): 

1247 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_edit_permission_denied") 

1248 

1249 # Find the organizer to remove 

1250 organizer_to_remove = session.execute( 

1251 select(EventOrganizer) 

1252 .where(EventOrganizer.user_id == user_id_to_remove) 

1253 .where(EventOrganizer.event_id == event.id) 

1254 ).scalar_one_or_none() 

1255 

1256 if not organizer_to_remove: 1256 ↛ 1257line 1256 didn't jump to line 1257 because the condition on line 1256 was never true

1257 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_not_an_organizer") 

1258 

1259 session.delete(organizer_to_remove) 

1260 

1261 return empty_pb2.Empty()