Coverage for src/couchers/servicers/events.py: 84%
499 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from psycopg2.extras import DateTimeTZRange
7from sqlalchemy.sql import and_, func, or_, select, update
9from couchers.context import make_background_user_context
10from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
11from couchers.jobs.enqueue import queue_job
12from couchers.models import (
13 AttendeeStatus,
14 Cluster,
15 ClusterSubscription,
16 Event,
17 EventCommunityInviteRequest,
18 EventOccurrence,
19 EventOccurrenceAttendee,
20 EventOrganizer,
21 EventSubscription,
22 Node,
23 Thread,
24 Upload,
25 User,
26)
27from couchers.notifications.notify import notify
28from couchers.proto import events_pb2, events_pb2_grpc, notification_data_pb2
29from couchers.proto.internal import jobs_pb2
30from couchers.servicers.api import user_model_to_pb
31from couchers.servicers.blocking import is_not_visible
32from couchers.servicers.threads import thread_to_pb
33from couchers.sql import couchers_select as select
34from couchers.tasks import send_event_community_invite_request_email
35from couchers.utils import (
36 Timestamp_from_datetime,
37 create_coordinate,
38 dt_from_millis,
39 millis_from_dt,
40 now,
41 to_aware_datetime,
42)
44logger = logging.getLogger(__name__)
46attendancestate2sql = {
47 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
48 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
49 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
50}
52attendancestate2api = {
53 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
54 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
55 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
56}
58MAX_PAGINATION_LENGTH = 25
61def _is_event_owner(event: Event, user_id):
62 """
63 Checks whether the user can act as an owner of the event
64 """
65 if event.owner_user:
66 return event.owner_user_id == user_id
67 # otherwise owned by a cluster
68 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
71def _is_event_organizer(event: Event, user_id):
72 """
73 Checks whether the user is as an organizer of the event
74 """
75 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None
78def _can_moderate_event(session, event: Event, user_id):
79 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
80 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
81 return True
83 # finally check if the user can moderate the parent node of the cluster
84 return can_moderate_node(session, user_id, event.parent_node_id)
87def _can_edit_event(session, event, user_id):
88 return (
89 _is_event_owner(event, user_id)
90 or _is_event_organizer(event, user_id)
91 or _can_moderate_event(session, event, user_id)
92 )
95def event_to_pb(session, occurrence: EventOccurrence, context):
96 event = occurrence.event
98 next_occurrence = (
99 event.occurrences.where(EventOccurrence.end_time >= now())
100 .order_by(EventOccurrence.end_time.asc())
101 .limit(1)
102 .one_or_none()
103 )
105 owner_community_id = None
106 owner_group_id = None
107 if event.owner_cluster:
108 if event.owner_cluster.is_official_cluster:
109 owner_community_id = event.owner_cluster.parent_node_id
110 else:
111 owner_group_id = event.owner_cluster.id
113 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
114 attendance_state = attendance.attendee_status if attendance else None
116 can_moderate = _can_moderate_event(session, event, context.user_id)
117 can_edit = _can_edit_event(session, event, context.user_id)
119 going_count = session.execute(
120 select(func.count())
121 .select_from(EventOccurrenceAttendee)
122 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
123 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
124 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
125 ).scalar_one()
126 maybe_count = session.execute(
127 select(func.count())
128 .select_from(EventOccurrenceAttendee)
129 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
130 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
131 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
132 ).scalar_one()
134 organizer_count = session.execute(
135 select(func.count())
136 .select_from(EventOrganizer)
137 .where_users_column_visible(context, EventOrganizer.user_id)
138 .where(EventOrganizer.event_id == event.id)
139 ).scalar_one()
140 subscriber_count = session.execute(
141 select(func.count())
142 .select_from(EventSubscription)
143 .where_users_column_visible(context, EventSubscription.user_id)
144 .where(EventSubscription.event_id == event.id)
145 ).scalar_one()
147 return events_pb2.Event(
148 event_id=occurrence.id,
149 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
150 is_cancelled=occurrence.is_cancelled,
151 is_deleted=occurrence.is_deleted,
152 title=event.title,
153 slug=event.slug,
154 content=occurrence.content,
155 photo_url=occurrence.photo.full_url if occurrence.photo else None,
156 online_information=(
157 events_pb2.OnlineEventInformation(
158 link=occurrence.link,
159 )
160 if occurrence.link
161 else None
162 ),
163 offline_information=(
164 events_pb2.OfflineEventInformation(
165 lat=occurrence.coordinates[0],
166 lng=occurrence.coordinates[1],
167 address=occurrence.address,
168 )
169 if occurrence.geom
170 else None
171 ),
172 created=Timestamp_from_datetime(occurrence.created),
173 last_edited=Timestamp_from_datetime(occurrence.last_edited),
174 creator_user_id=occurrence.creator_user_id,
175 start_time=Timestamp_from_datetime(occurrence.start_time),
176 end_time=Timestamp_from_datetime(occurrence.end_time),
177 timezone=occurrence.timezone,
178 start_time_display=str(occurrence.start_time),
179 end_time_display=str(occurrence.end_time),
180 attendance_state=attendancestate2api[attendance_state],
181 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
182 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
183 going_count=going_count,
184 maybe_count=maybe_count,
185 organizer_count=organizer_count,
186 subscriber_count=subscriber_count,
187 owner_user_id=event.owner_user_id,
188 owner_community_id=owner_community_id,
189 owner_group_id=owner_group_id,
190 thread=thread_to_pb(session, event.thread_id),
191 can_edit=can_edit,
192 can_moderate=can_moderate,
193 )
196def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool):
197 query = (
198 select(Event, EventOccurrence)
199 .where(EventOccurrence.id == occurrence_id)
200 .where(EventOccurrence.event_id == Event.id)
201 )
203 if not include_deleted:
204 query = query.where(~EventOccurrence.is_deleted)
206 return query
209def _get_event_and_occurrence_one(
210 session, occurrence_id, include_deleted: bool = False
211) -> tuple[Event, EventOccurrence]:
212 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one()
215def _get_event_and_occurrence_one_or_none(
216 session, occurrence_id, include_deleted: bool = False
217) -> tuple[Event, EventOccurrence] | None:
218 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none()
221def _check_occurrence_time_validity(start_time, end_time, context):
222 if start_time < now():
223 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_in_past")
224 if end_time < start_time:
225 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_ends_before_starts")
226 if end_time - start_time > timedelta(days=7):
227 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_long")
228 if start_time - now() > timedelta(days=365):
229 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_too_far_in_future")
232def get_users_to_notify_for_new_event(session, occurrence):
233 """
234 Returns the users to notify, as well as the community id that is being notified (None if based on geo search)
235 """
236 cluster = occurrence.event.parent_node.official_cluster
237 if cluster.parent_node_id == 1:
238 logger.info("The Global Community is too big for email notifications.")
239 return [], occurrence.event.parent_node_id
240 elif occurrence.creator_user in cluster.admins or cluster.is_leaf:
241 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id
242 else:
243 max_radius = 20000 # m
244 users = (
245 session.execute(
246 select(User)
247 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
248 .where(User.is_visible)
249 .where(ClusterSubscription.cluster_id == cluster.id)
250 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111))
251 )
252 .scalars()
253 .all()
254 )
255 return users, None
258def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload):
259 """
260 Background job to generated/fan out event notifications
261 """
262 from couchers.servicers.communities import community_to_pb
264 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}")
266 with session_scope() as session:
267 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
268 creator = occurrence.creator_user
270 users, node_id = get_users_to_notify_for_new_event(session, occurrence)
272 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none()
274 if not inviting_user:
275 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?")
276 return
278 for user in users:
279 if is_not_visible(session, user.id, creator.id):
280 continue
281 context = make_background_user_context(user_id=user.id)
282 notify(
283 session,
284 user_id=user.id,
285 topic_action="event:create_approved" if payload.approved else "event:create_any",
286 key=str(payload.occurrence_id),
287 data=notification_data_pb2.EventCreate(
288 event=event_to_pb(session, occurrence, context),
289 inviting_user=user_model_to_pb(inviting_user, session, context),
290 nearby=True if node_id is None else None,
291 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None,
292 ),
293 )
296def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload):
297 with session_scope() as session:
298 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
300 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none()
302 subscribed_user_ids = [user.id for user in event.subscribers]
303 attending_user_ids = [user.user_id for user in occurrence.attendances]
305 for user_id in set(subscribed_user_ids + attending_user_ids):
306 if is_not_visible(session, user_id, updating_user.id):
307 continue
308 context = make_background_user_context(user_id=user_id)
309 notify(
310 session,
311 user_id=user_id,
312 topic_action="event:update",
313 key=str(payload.occurrence_id),
314 data=notification_data_pb2.EventUpdate(
315 event=event_to_pb(session, occurrence, context),
316 updating_user=user_model_to_pb(updating_user, session, context),
317 updated_items=payload.updated_items,
318 ),
319 )
322def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload):
323 with session_scope() as session:
324 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
326 cancelling_user = session.execute(
327 select(User).where(User.id == payload.cancelling_user_id)
328 ).scalar_one_or_none()
330 subscribed_user_ids = [user.id for user in event.subscribers]
331 attending_user_ids = [user.user_id for user in occurrence.attendances]
333 for user_id in set(subscribed_user_ids + attending_user_ids):
334 if is_not_visible(session, user_id, cancelling_user.id):
335 continue
336 context = make_background_user_context(user_id=user_id)
337 notify(
338 session,
339 user_id=user_id,
340 topic_action="event:cancel",
341 key=str(payload.occurrence_id),
342 data=notification_data_pb2.EventCancel(
343 event=event_to_pb(session, occurrence, context),
344 cancelling_user=user_model_to_pb(cancelling_user, session, context),
345 ),
346 )
349def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload):
350 with session_scope() as session:
351 event, occurrence = _get_event_and_occurrence_one(
352 session, occurrence_id=payload.occurrence_id, include_deleted=True
353 )
355 subscribed_user_ids = [user.id for user in event.subscribers]
356 attending_user_ids = [user.user_id for user in occurrence.attendances]
358 for user_id in set(subscribed_user_ids + attending_user_ids):
359 context = make_background_user_context(user_id=user_id)
360 notify(
361 session,
362 user_id=user_id,
363 topic_action="event:delete",
364 key=str(payload.occurrence_id),
365 data=notification_data_pb2.EventDelete(
366 event=event_to_pb(session, occurrence, context),
367 ),
368 )
371class Events(events_pb2_grpc.EventsServicer):
372 def CreateEvent(self, request, context, session):
373 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
374 if not user.has_completed_profile:
375 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "incomplete_profile_create_event")
376 if not request.title:
377 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_title")
378 if not request.content:
379 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content")
380 if request.HasField("online_information"):
381 online = True
382 geom = None
383 address = None
384 if not request.online_information.link:
385 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link")
386 link = request.online_information.link
387 elif request.HasField("offline_information"):
388 online = False
389 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value
390 if not (
391 request.offline_information.address
392 and request.offline_information.lat
393 and request.offline_information.lng
394 ):
395 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location")
396 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
397 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate")
398 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
399 address = request.offline_information.address
400 link = None
401 else:
402 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link")
404 start_time = to_aware_datetime(request.start_time)
405 end_time = to_aware_datetime(request.end_time)
407 _check_occurrence_time_validity(start_time, end_time, context)
409 if request.parent_community_id:
410 parent_node = session.execute(
411 select(Node).where(Node.id == request.parent_community_id)
412 ).scalar_one_or_none()
414 if not parent_node.official_cluster.events_enabled:
415 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "events_not_enabled")
416 else:
417 if online:
418 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_missing_parent_community")
419 # parent community computed from geom
420 parent_node = get_parent_node_at_location(session, geom)
422 if not parent_node:
423 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "community_not_found")
425 if (
426 request.photo_key
427 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
428 ):
429 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found")
431 event = Event(
432 title=request.title,
433 parent_node_id=parent_node.id,
434 owner_user_id=context.user_id,
435 thread=Thread(),
436 creator_user_id=context.user_id,
437 )
438 session.add(event)
440 occurrence = EventOccurrence(
441 event=event,
442 content=request.content,
443 geom=geom,
444 address=address,
445 link=link,
446 photo_key=request.photo_key if request.photo_key != "" else None,
447 # timezone=timezone,
448 during=DateTimeTZRange(start_time, end_time),
449 creator_user_id=context.user_id,
450 )
451 session.add(occurrence)
453 session.add(
454 EventOrganizer(
455 user_id=context.user_id,
456 event=event,
457 )
458 )
460 session.add(
461 EventSubscription(
462 user_id=context.user_id,
463 event=event,
464 )
465 )
467 session.add(
468 EventOccurrenceAttendee(
469 user_id=context.user_id,
470 occurrence=occurrence,
471 attendee_status=AttendeeStatus.going,
472 )
473 )
475 session.commit()
477 if user.has_completed_profile:
478 queue_job(
479 session,
480 "generate_event_create_notifications",
481 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
482 inviting_user_id=user.id,
483 occurrence_id=occurrence.id,
484 approved=False,
485 ),
486 )
488 return event_to_pb(session, occurrence, context)
490 def ScheduleEvent(self, request, context, session):
491 if not request.content:
492 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_content")
493 if request.HasField("online_information"):
494 geom = None
495 address = None
496 link = request.online_information.link
497 elif request.HasField("offline_information"):
498 if not (
499 request.offline_information.address
500 and request.offline_information.lat
501 and request.offline_information.lng
502 ):
503 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_or_location")
504 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
505 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate")
506 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
507 address = request.offline_information.address
508 link = None
509 else:
510 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_event_address_location_or_link")
512 start_time = to_aware_datetime(request.start_time)
513 end_time = to_aware_datetime(request.end_time)
515 _check_occurrence_time_validity(start_time, end_time, context)
517 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
518 if not res:
519 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
521 event, occurrence = res
523 if not _can_edit_event(session, event, context.user_id):
524 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied")
526 if occurrence.is_cancelled:
527 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
529 if (
530 request.photo_key
531 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
532 ):
533 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "photo_not_found")
535 during = DateTimeTZRange(start_time, end_time)
537 # && is the overlap operator for ranges
538 if (
539 session.execute(
540 select(EventOccurrence.id)
541 .where(EventOccurrence.event_id == event.id)
542 .where(EventOccurrence.during.op("&&")(during))
543 .limit(1)
544 )
545 .scalars()
546 .one_or_none()
547 is not None
548 ):
549 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap")
551 occurrence = EventOccurrence(
552 event=event,
553 content=request.content,
554 geom=geom,
555 address=address,
556 link=link,
557 photo_key=request.photo_key if request.photo_key != "" else None,
558 # timezone=timezone,
559 during=during,
560 creator_user_id=context.user_id,
561 )
562 session.add(occurrence)
564 session.add(
565 EventOccurrenceAttendee(
566 user_id=context.user_id,
567 occurrence=occurrence,
568 attendee_status=AttendeeStatus.going,
569 )
570 )
572 session.flush()
574 # TODO: notify
576 return event_to_pb(session, occurrence, context)
578 def UpdateEvent(self, request, context, session):
579 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
580 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
581 if not res:
582 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
584 event, occurrence = res
586 if not _can_edit_event(session, event, context.user_id):
587 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied")
589 # the things that were updated and need to be notified about
590 notify_updated = []
592 if occurrence.is_cancelled:
593 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
595 occurrence_update = {"last_edited": now()}
597 if request.HasField("title"):
598 notify_updated.append("title")
599 event.title = request.title.value
600 event.last_edited = now()
602 if request.HasField("content"):
603 notify_updated.append("content")
604 occurrence_update["content"] = request.content.value
606 if request.HasField("photo_key"):
607 occurrence_update["photo_key"] = request.photo_key.value
609 if request.HasField("online_information"):
610 notify_updated.append("location")
611 if not request.online_information.link:
612 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "online_event_requires_link")
613 occurrence_update["link"] = request.online_information.link
614 occurrence_update["geom"] = None
615 occurrence_update["address"] = None
616 elif request.HasField("offline_information"):
617 notify_updated.append("location")
618 occurrence_update["link"] = None
619 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
620 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_coordinate")
621 occurrence_update["geom"] = create_coordinate(
622 request.offline_information.lat, request.offline_information.lng
623 )
624 occurrence_update["address"] = request.offline_information.address
626 if request.HasField("start_time") or request.HasField("end_time"):
627 if request.update_all_future:
628 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "event_cant_update_all_times")
629 if request.HasField("start_time"):
630 notify_updated.append("start time")
631 start_time = to_aware_datetime(request.start_time)
632 else:
633 start_time = occurrence.start_time
634 if request.HasField("end_time"):
635 notify_updated.append("end time")
636 end_time = to_aware_datetime(request.end_time)
637 else:
638 end_time = occurrence.end_time
640 _check_occurrence_time_validity(start_time, end_time, context)
642 during = DateTimeTZRange(start_time, end_time)
644 # && is the overlap operator for ranges
645 if (
646 session.execute(
647 select(EventOccurrence.id)
648 .where(EventOccurrence.event_id == event.id)
649 .where(EventOccurrence.id != occurrence.id)
650 .where(EventOccurrence.during.op("&&")(during))
651 .limit(1)
652 )
653 .scalars()
654 .one_or_none()
655 is not None
656 ):
657 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_overlap")
659 occurrence_update["during"] = during
661 # TODO
662 # if request.HasField("timezone"):
663 # occurrence_update["timezone"] = request.timezone
665 # allow editing any event which hasn't ended more than 24 hours before now
666 # when editing all future events, we edit all which have not yet ended
668 if request.update_all_future:
669 session.execute(
670 update(EventOccurrence)
671 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
672 .where(EventOccurrence.start_time >= occurrence.start_time)
673 .values(occurrence_update)
674 .execution_options(synchronize_session=False)
675 )
676 else:
677 if occurrence.end_time < now() - timedelta(hours=24):
678 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
679 session.execute(
680 update(EventOccurrence)
681 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
682 .where(EventOccurrence.id == occurrence.id)
683 .values(occurrence_update)
684 .execution_options(synchronize_session=False)
685 )
687 session.flush()
689 if notify_updated:
690 if request.should_notify:
691 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying")
693 queue_job(
694 session,
695 "generate_event_update_notifications",
696 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload(
697 updating_user_id=user.id,
698 occurrence_id=occurrence.id,
699 updated_items=notify_updated,
700 ),
701 )
702 else:
703 logger.info(
704 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications"
705 )
707 # since we have synchronize_session=False, we have to refresh the object
708 session.refresh(occurrence)
710 return event_to_pb(session, occurrence, context)
712 def GetEvent(self, request, context, session):
713 occurrence = session.execute(
714 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
715 ).scalar_one_or_none()
717 if not occurrence:
718 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
720 return event_to_pb(session, occurrence, context)
722 def CancelEvent(self, request, context, session):
723 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
724 if not res:
725 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
727 event, occurrence = res
729 if not _can_edit_event(session, event, context.user_id):
730 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied")
732 if occurrence.end_time < now() - timedelta(hours=24):
733 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_cancel_old_event")
735 occurrence.is_cancelled = True
737 queue_job(
738 session,
739 "generate_event_cancel_notifications",
740 payload=jobs_pb2.GenerateEventCancelNotificationsPayload(
741 cancelling_user_id=context.user_id,
742 occurrence_id=occurrence.id,
743 ),
744 )
746 return empty_pb2.Empty()
748 def RequestCommunityInvite(self, request, context, session):
749 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
750 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
751 if not res:
752 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
754 event, occurrence = res
756 if not _can_edit_event(session, event, context.user_id):
757 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied")
759 if occurrence.is_cancelled:
760 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
762 if occurrence.end_time < now() - timedelta(hours=24):
763 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
765 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id]
767 if len(this_user_reqs) > 0:
768 context.abort_with_error_code(
769 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_requested"
770 )
772 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved]
774 if len(approved_reqs) > 0:
775 context.abort_with_error_code(
776 grpc.StatusCode.FAILED_PRECONDITION, "event_community_invite_already_approved"
777 )
779 request = EventCommunityInviteRequest(
780 occurrence_id=request.event_id,
781 user_id=context.user_id,
782 )
783 session.add(request)
784 session.flush()
786 send_event_community_invite_request_email(session, request)
788 return empty_pb2.Empty()
790 def ListEventOccurrences(self, request, context, session):
791 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
792 # the page token is a unix timestamp of where we left off
793 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
794 occurrence = session.execute(
795 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
796 ).scalar_one_or_none()
797 if not occurrence:
798 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
800 occurrences = (
801 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted)
802 )
804 if not request.include_cancelled:
805 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
807 if not request.past:
808 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
809 EventOccurrence.start_time.asc()
810 )
811 else:
812 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
813 EventOccurrence.start_time.desc()
814 )
816 occurrences = occurrences.limit(page_size + 1)
817 occurrences = session.execute(occurrences).scalars().all()
819 return events_pb2.ListEventOccurrencesRes(
820 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
821 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
822 )
824 def ListEventAttendees(self, request, context, session):
825 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
826 next_user_id = int(request.page_token) if request.page_token else 0
827 occurrence = session.execute(
828 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
829 ).scalar_one_or_none()
830 if not occurrence:
831 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
832 attendees = (
833 session.execute(
834 select(EventOccurrenceAttendee)
835 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
836 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
837 .where(EventOccurrenceAttendee.user_id >= next_user_id)
838 .order_by(EventOccurrenceAttendee.user_id)
839 .limit(page_size + 1)
840 )
841 .scalars()
842 .all()
843 )
844 return events_pb2.ListEventAttendeesRes(
845 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
846 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
847 )
849 def ListEventSubscribers(self, request, context, session):
850 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
851 next_user_id = int(request.page_token) if request.page_token else 0
852 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
853 if not res:
854 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
855 event, occurrence = res
856 subscribers = (
857 session.execute(
858 select(EventSubscription)
859 .where_users_column_visible(context, EventSubscription.user_id)
860 .where(EventSubscription.event_id == event.id)
861 .where(EventSubscription.user_id >= next_user_id)
862 .order_by(EventSubscription.user_id)
863 .limit(page_size + 1)
864 )
865 .scalars()
866 .all()
867 )
868 return events_pb2.ListEventSubscribersRes(
869 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
870 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
871 )
873 def ListEventOrganizers(self, request, context, session):
874 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
875 next_user_id = int(request.page_token) if request.page_token else 0
876 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
877 if not res:
878 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
879 event, occurrence = res
880 organizers = (
881 session.execute(
882 select(EventOrganizer)
883 .where_users_column_visible(context, EventOrganizer.user_id)
884 .where(EventOrganizer.event_id == event.id)
885 .where(EventOrganizer.user_id >= next_user_id)
886 .order_by(EventOrganizer.user_id)
887 .limit(page_size + 1)
888 )
889 .scalars()
890 .all()
891 )
892 return events_pb2.ListEventOrganizersRes(
893 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
894 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
895 )
897 def TransferEvent(self, request, context, session):
898 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
899 if not res:
900 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
902 event, occurrence = res
904 if not _can_edit_event(session, event, context.user_id):
905 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_transfer_permission_denied")
907 if occurrence.is_cancelled:
908 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
910 if occurrence.end_time < now() - timedelta(hours=24):
911 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
913 if request.WhichOneof("new_owner") == "new_owner_group_id":
914 cluster = session.execute(
915 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
916 ).scalar_one_or_none()
917 elif request.WhichOneof("new_owner") == "new_owner_community_id":
918 cluster = session.execute(
919 select(Cluster)
920 .where(Cluster.parent_node_id == request.new_owner_community_id)
921 .where(Cluster.is_official_cluster)
922 ).scalar_one_or_none()
924 if not cluster:
925 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "group_or_community_not_found")
927 event.owner_user = None
928 event.owner_cluster = cluster
930 session.commit()
931 return event_to_pb(session, occurrence, context)
933 def SetEventSubscription(self, request, context, session):
934 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
935 if not res:
936 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
938 event, occurrence = res
940 if occurrence.is_cancelled:
941 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
943 if occurrence.end_time < now() - timedelta(hours=24):
944 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
946 current_subscription = session.execute(
947 select(EventSubscription)
948 .where(EventSubscription.user_id == context.user_id)
949 .where(EventSubscription.event_id == event.id)
950 ).scalar_one_or_none()
952 # if not subscribed, subscribe
953 if request.subscribe and not current_subscription:
954 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
956 # if subscribed but unsubbing, remove subscription
957 if not request.subscribe and current_subscription:
958 session.delete(current_subscription)
960 session.flush()
962 return event_to_pb(session, occurrence, context)
964 def SetEventAttendance(self, request, context, session):
965 occurrence = session.execute(
966 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
967 ).scalar_one_or_none()
969 if not occurrence:
970 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
972 if occurrence.is_cancelled:
973 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
975 if occurrence.end_time < now() - timedelta(hours=24):
976 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
978 current_attendance = session.execute(
979 select(EventOccurrenceAttendee)
980 .where(EventOccurrenceAttendee.user_id == context.user_id)
981 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
982 ).scalar_one_or_none()
984 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
985 if current_attendance:
986 session.delete(current_attendance)
987 # if unset/not going, nothing to do!
988 else:
989 if current_attendance:
990 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
991 else:
992 # create new
993 attendance = EventOccurrenceAttendee(
994 user_id=context.user_id,
995 occurrence_id=occurrence.id,
996 attendee_status=attendancestate2sql[request.attendance_state],
997 )
998 session.add(attendance)
1000 session.flush()
1002 return event_to_pb(session, occurrence, context)
1004 def ListMyEvents(self, request, context, session):
1005 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1006 # the page token is a unix timestamp of where we left off
1007 page_token = (
1008 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
1009 )
1010 # the page number is the page number we are on
1011 page_number = request.page_number or 1
1012 # Calculate the offset for pagination
1013 offset = (page_number - 1) * page_size
1014 occurrences = (
1015 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1016 )
1018 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
1019 include_subscribed = request.subscribed or include_all
1020 include_organizing = request.organizing or include_all
1021 include_attending = request.attending or include_all
1022 include_my_communities = request.my_communities or include_all
1024 where_ = []
1026 if include_subscribed:
1027 occurrences = occurrences.outerjoin(
1028 EventSubscription,
1029 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
1030 )
1031 where_.append(EventSubscription.user_id != None)
1032 if include_organizing:
1033 occurrences = occurrences.outerjoin(
1034 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
1035 )
1036 where_.append(EventOrganizer.user_id != None)
1037 if include_attending:
1038 occurrences = occurrences.outerjoin(
1039 EventOccurrenceAttendee,
1040 and_(
1041 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
1042 EventOccurrenceAttendee.user_id == context.user_id,
1043 ),
1044 )
1045 where_.append(EventOccurrenceAttendee.user_id != None)
1046 if include_my_communities:
1047 my_communities = (
1048 session.execute(
1049 select(Node.id)
1050 .join(Cluster, Cluster.parent_node_id == Node.id)
1051 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
1052 .where(ClusterSubscription.user_id == context.user_id)
1053 .where(Cluster.is_official_cluster)
1054 .order_by(Node.id)
1055 .limit(100000)
1056 )
1057 .scalars()
1058 .all()
1059 )
1060 where_.append(Event.parent_node_id.in_(my_communities))
1062 occurrences = occurrences.where(or_(*where_))
1064 if not request.include_cancelled:
1065 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1067 if not request.past:
1068 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1069 EventOccurrence.start_time.asc()
1070 )
1071 else:
1072 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1073 EventOccurrence.start_time.desc()
1074 )
1075 # Count the total number of items for pagination
1076 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar()
1077 # Apply pagination by page number
1078 occurrences = (
1079 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1)
1080 )
1081 occurrences = session.execute(occurrences).scalars().all()
1083 return events_pb2.ListMyEventsRes(
1084 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1085 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1086 total_items=total_items,
1087 )
1089 def ListAllEvents(self, request, context, session):
1090 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1091 # the page token is a unix timestamp of where we left off
1092 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
1094 occurrences = (
1095 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1096 )
1098 if not request.include_cancelled:
1099 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1101 if not request.past:
1102 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1103 EventOccurrence.start_time.asc()
1104 )
1105 else:
1106 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1107 EventOccurrence.start_time.desc()
1108 )
1110 occurrences = occurrences.limit(page_size + 1)
1111 occurrences = session.execute(occurrences).scalars().all()
1113 return events_pb2.ListAllEventsRes(
1114 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1115 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1116 )
1118 def InviteEventOrganizer(self, request, context, session):
1119 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
1120 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1121 if not res:
1122 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
1124 event, occurrence = res
1126 if not _can_edit_event(session, event, context.user_id):
1127 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_edit_permission_denied")
1129 if occurrence.is_cancelled:
1130 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
1132 if occurrence.end_time < now() - timedelta(hours=24):
1133 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
1135 if not session.execute(
1136 select(User).where_users_visible(context).where(User.id == request.user_id)
1137 ).scalar_one_or_none():
1138 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1140 session.add(
1141 EventOrganizer(
1142 user_id=request.user_id,
1143 event=event,
1144 )
1145 )
1146 session.flush()
1148 other_user_context = make_background_user_context(user_id=request.user_id)
1150 notify(
1151 session,
1152 user_id=request.user_id,
1153 topic_action="event:invite_organizer",
1154 key=str(event.id),
1155 data=notification_data_pb2.EventInviteOrganizer(
1156 event=event_to_pb(session, occurrence, other_user_context),
1157 inviting_user=user_model_to_pb(user, session, other_user_context),
1158 ),
1159 )
1161 return empty_pb2.Empty()
1163 def RemoveEventOrganizer(self, request, context, session):
1164 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1165 if not res:
1166 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
1168 event, occurrence = res
1170 if occurrence.is_cancelled:
1171 context.abort_with_error_code(grpc.StatusCode.PERMISSION_DENIED, "event_cant_update_cancelled_event")
1173 if occurrence.end_time < now() - timedelta(hours=24):
1174 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_update_old_event")
1176 # Determine which user to remove
1177 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id
1179 # Check if the target user is the event owner (only after permission check)
1180 if event.owner_user_id == user_id_to_remove:
1181 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_cant_remove_owner_as_organizer")
1183 # Check permissions: either an organizer removing an organizer OR you're the event owner
1184 if not _can_edit_event(session, event, context.user_id):
1185 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_edit_permission_denied")
1187 # Find the organizer to remove
1188 organizer_to_remove = session.execute(
1189 select(EventOrganizer)
1190 .where(EventOrganizer.user_id == user_id_to_remove)
1191 .where(EventOrganizer.event_id == event.id)
1192 ).scalar_one_or_none()
1194 if not organizer_to_remove:
1195 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "event_not_an_organizer")
1197 session.delete(organizer_to_remove)
1199 return empty_pb2.Empty()