Coverage for src/couchers/servicers/events.py: 84%
500 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-08-28 14:55 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from psycopg2.extras import DateTimeTZRange
7from sqlalchemy.sql import and_, func, or_, select, update
9from couchers import errors
10from couchers.context import make_background_user_context
11from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.models import (
14 AttendeeStatus,
15 Cluster,
16 ClusterSubscription,
17 Event,
18 EventCommunityInviteRequest,
19 EventOccurrence,
20 EventOccurrenceAttendee,
21 EventOrganizer,
22 EventSubscription,
23 Node,
24 Thread,
25 Upload,
26 User,
27)
28from couchers.notifications.notify import notify
29from couchers.servicers.api import user_model_to_pb
30from couchers.servicers.blocking import is_not_visible
31from couchers.servicers.threads import thread_to_pb
32from couchers.sql import couchers_select as select
33from couchers.tasks import send_event_community_invite_request_email
34from couchers.utils import (
35 Timestamp_from_datetime,
36 create_coordinate,
37 dt_from_millis,
38 millis_from_dt,
39 now,
40 to_aware_datetime,
41)
42from proto import events_pb2, events_pb2_grpc, notification_data_pb2
43from proto.internal import jobs_pb2
45logger = logging.getLogger(__name__)
47attendancestate2sql = {
48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
51}
53attendancestate2api = {
54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
57}
59MAX_PAGINATION_LENGTH = 25
62def _is_event_owner(event: Event, user_id):
63 """
64 Checks whether the user can act as an owner of the event
65 """
66 if event.owner_user:
67 return event.owner_user_id == user_id
68 # otherwise owned by a cluster
69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
72def _is_event_organizer(event: Event, user_id):
73 """
74 Checks whether the user is as an organizer of the event
75 """
76 return event.organizers.where(EventOrganizer.user_id == user_id).one_or_none() is not None
79def _can_moderate_event(session, event: Event, user_id):
80 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
81 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
82 return True
84 # finally check if the user can moderate the parent node of the cluster
85 return can_moderate_node(session, user_id, event.parent_node_id)
88def _can_edit_event(session, event, user_id):
89 return (
90 _is_event_owner(event, user_id)
91 or _is_event_organizer(event, user_id)
92 or _can_moderate_event(session, event, user_id)
93 )
96def event_to_pb(session, occurrence: EventOccurrence, context):
97 event = occurrence.event
99 next_occurrence = (
100 event.occurrences.where(EventOccurrence.end_time >= now())
101 .order_by(EventOccurrence.end_time.asc())
102 .limit(1)
103 .one_or_none()
104 )
106 owner_community_id = None
107 owner_group_id = None
108 if event.owner_cluster:
109 if event.owner_cluster.is_official_cluster:
110 owner_community_id = event.owner_cluster.parent_node_id
111 else:
112 owner_group_id = event.owner_cluster.id
114 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
115 attendance_state = attendance.attendee_status if attendance else None
117 can_moderate = _can_moderate_event(session, event, context.user_id)
118 can_edit = _can_edit_event(session, event, context.user_id)
120 going_count = session.execute(
121 select(func.count())
122 .select_from(EventOccurrenceAttendee)
123 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
124 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
125 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
126 ).scalar_one()
127 maybe_count = session.execute(
128 select(func.count())
129 .select_from(EventOccurrenceAttendee)
130 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
131 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
132 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
133 ).scalar_one()
135 organizer_count = session.execute(
136 select(func.count())
137 .select_from(EventOrganizer)
138 .where_users_column_visible(context, EventOrganizer.user_id)
139 .where(EventOrganizer.event_id == event.id)
140 ).scalar_one()
141 subscriber_count = session.execute(
142 select(func.count())
143 .select_from(EventSubscription)
144 .where_users_column_visible(context, EventSubscription.user_id)
145 .where(EventSubscription.event_id == event.id)
146 ).scalar_one()
148 return events_pb2.Event(
149 event_id=occurrence.id,
150 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
151 is_cancelled=occurrence.is_cancelled,
152 is_deleted=occurrence.is_deleted,
153 title=event.title,
154 slug=event.slug,
155 content=occurrence.content,
156 photo_url=occurrence.photo.full_url if occurrence.photo else None,
157 online_information=(
158 events_pb2.OnlineEventInformation(
159 link=occurrence.link,
160 )
161 if occurrence.link
162 else None
163 ),
164 offline_information=(
165 events_pb2.OfflineEventInformation(
166 lat=occurrence.coordinates[0],
167 lng=occurrence.coordinates[1],
168 address=occurrence.address,
169 )
170 if occurrence.geom
171 else None
172 ),
173 created=Timestamp_from_datetime(occurrence.created),
174 last_edited=Timestamp_from_datetime(occurrence.last_edited),
175 creator_user_id=occurrence.creator_user_id,
176 start_time=Timestamp_from_datetime(occurrence.start_time),
177 end_time=Timestamp_from_datetime(occurrence.end_time),
178 timezone=occurrence.timezone,
179 start_time_display=str(occurrence.start_time),
180 end_time_display=str(occurrence.end_time),
181 attendance_state=attendancestate2api[attendance_state],
182 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
183 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
184 going_count=going_count,
185 maybe_count=maybe_count,
186 organizer_count=organizer_count,
187 subscriber_count=subscriber_count,
188 owner_user_id=event.owner_user_id,
189 owner_community_id=owner_community_id,
190 owner_group_id=owner_group_id,
191 thread=thread_to_pb(session, event.thread_id),
192 can_edit=can_edit,
193 can_moderate=can_moderate,
194 )
197def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool):
198 query = (
199 select(Event, EventOccurrence)
200 .where(EventOccurrence.id == occurrence_id)
201 .where(EventOccurrence.event_id == Event.id)
202 )
204 if not include_deleted:
205 query = query.where(~EventOccurrence.is_deleted)
207 return query
210def _get_event_and_occurrence_one(
211 session, occurrence_id, include_deleted: bool = False
212) -> tuple[Event, EventOccurrence]:
213 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one()
216def _get_event_and_occurrence_one_or_none(
217 session, occurrence_id, include_deleted: bool = False
218) -> tuple[Event, EventOccurrence] | None:
219 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none()
222def _check_occurrence_time_validity(start_time, end_time, context):
223 if start_time < now():
224 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST)
225 if end_time < start_time:
226 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS)
227 if end_time - start_time > timedelta(days=7):
228 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG)
229 if start_time - now() > timedelta(days=365):
230 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE)
233def get_users_to_notify_for_new_event(session, occurrence):
234 """
235 Returns the users to notify, as well as the community id that is being notified (None if based on geo search)
236 """
237 cluster = occurrence.event.parent_node.official_cluster
238 if cluster.parent_node_id == 1:
239 logger.info("The Global Community is too big for email notifications.")
240 return [], occurrence.event.parent_node_id
241 elif occurrence.creator_user in cluster.admins or cluster.is_leaf:
242 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id
243 else:
244 max_radius = 20000 # m
245 users = (
246 session.execute(
247 select(User)
248 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
249 .where(User.is_visible)
250 .where(ClusterSubscription.cluster_id == cluster.id)
251 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111))
252 )
253 .scalars()
254 .all()
255 )
256 return users, None
259def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload):
260 """
261 Background job to generated/fan out event notifications
262 """
263 from couchers.servicers.communities import community_to_pb
265 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}")
267 with session_scope() as session:
268 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
269 creator = occurrence.creator_user
271 users, node_id = get_users_to_notify_for_new_event(session, occurrence)
273 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none()
275 if not inviting_user:
276 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?")
277 return
279 for user in users:
280 if is_not_visible(session, user.id, creator.id):
281 continue
282 context = make_background_user_context(user_id=user.id)
283 notify(
284 session,
285 user_id=user.id,
286 topic_action="event:create_approved" if payload.approved else "event:create_any",
287 key=payload.occurrence_id,
288 data=notification_data_pb2.EventCreate(
289 event=event_to_pb(session, occurrence, context),
290 inviting_user=user_model_to_pb(inviting_user, session, context),
291 nearby=True if node_id is None else None,
292 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None,
293 ),
294 )
297def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload):
298 with session_scope() as session:
299 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
301 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none()
303 subscribed_user_ids = [user.id for user in event.subscribers]
304 attending_user_ids = [user.user_id for user in occurrence.attendances]
306 for user_id in set(subscribed_user_ids + attending_user_ids):
307 if is_not_visible(session, user_id, updating_user.id):
308 continue
309 context = make_background_user_context(user_id=user_id)
310 notify(
311 session,
312 user_id=user_id,
313 topic_action="event:update",
314 key=payload.occurrence_id,
315 data=notification_data_pb2.EventUpdate(
316 event=event_to_pb(session, occurrence, context),
317 updating_user=user_model_to_pb(updating_user, session, context),
318 updated_items=payload.updated_items,
319 ),
320 )
323def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload):
324 with session_scope() as session:
325 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
327 cancelling_user = session.execute(
328 select(User).where(User.id == payload.cancelling_user_id)
329 ).scalar_one_or_none()
331 subscribed_user_ids = [user.id for user in event.subscribers]
332 attending_user_ids = [user.user_id for user in occurrence.attendances]
334 for user_id in set(subscribed_user_ids + attending_user_ids):
335 if is_not_visible(session, user_id, cancelling_user.id):
336 continue
337 context = make_background_user_context(user_id=user_id)
338 notify(
339 session,
340 user_id=user_id,
341 topic_action="event:cancel",
342 key=payload.occurrence_id,
343 data=notification_data_pb2.EventCancel(
344 event=event_to_pb(session, occurrence, context),
345 cancelling_user=user_model_to_pb(cancelling_user, session, context),
346 ),
347 )
350def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload):
351 with session_scope() as session:
352 event, occurrence = _get_event_and_occurrence_one(
353 session, occurrence_id=payload.occurrence_id, include_deleted=True
354 )
356 subscribed_user_ids = [user.id for user in event.subscribers]
357 attending_user_ids = [user.user_id for user in occurrence.attendances]
359 for user_id in set(subscribed_user_ids + attending_user_ids):
360 context = make_background_user_context(user_id=user_id)
361 notify(
362 session,
363 user_id=user_id,
364 topic_action="event:delete",
365 key=payload.occurrence_id,
366 data=notification_data_pb2.EventDelete(
367 event=event_to_pb(session, occurrence, context),
368 ),
369 )
372class Events(events_pb2_grpc.EventsServicer):
373 def CreateEvent(self, request, context, session):
374 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
375 if not user.has_completed_profile:
376 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT)
377 if not request.title:
378 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE)
379 if not request.content:
380 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
381 if request.HasField("online_information"):
382 online = True
383 geom = None
384 address = None
385 if not request.online_information.link:
386 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
387 link = request.online_information.link
388 elif request.HasField("offline_information"):
389 online = False
390 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value
391 if not (
392 request.offline_information.address
393 and request.offline_information.lat
394 and request.offline_information.lng
395 ):
396 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
397 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
398 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
399 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
400 address = request.offline_information.address
401 link = None
402 else:
403 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
405 start_time = to_aware_datetime(request.start_time)
406 end_time = to_aware_datetime(request.end_time)
408 _check_occurrence_time_validity(start_time, end_time, context)
410 if request.parent_community_id:
411 parent_node = session.execute(
412 select(Node).where(Node.id == request.parent_community_id)
413 ).scalar_one_or_none()
415 if not parent_node.official_cluster.events_enabled:
416 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENTS_NOT_ENABLED)
417 else:
418 if online:
419 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY)
420 # parent community computed from geom
421 parent_node = get_parent_node_at_location(session, geom)
423 if not parent_node:
424 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
426 if (
427 request.photo_key
428 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
429 ):
430 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
432 event = Event(
433 title=request.title,
434 parent_node_id=parent_node.id,
435 owner_user_id=context.user_id,
436 thread=Thread(),
437 creator_user_id=context.user_id,
438 )
439 session.add(event)
441 occurrence = EventOccurrence(
442 event=event,
443 content=request.content,
444 geom=geom,
445 address=address,
446 link=link,
447 photo_key=request.photo_key if request.photo_key != "" else None,
448 # timezone=timezone,
449 during=DateTimeTZRange(start_time, end_time),
450 creator_user_id=context.user_id,
451 )
452 session.add(occurrence)
454 session.add(
455 EventOrganizer(
456 user_id=context.user_id,
457 event=event,
458 )
459 )
461 session.add(
462 EventSubscription(
463 user_id=context.user_id,
464 event=event,
465 )
466 )
468 session.add(
469 EventOccurrenceAttendee(
470 user_id=context.user_id,
471 occurrence=occurrence,
472 attendee_status=AttendeeStatus.going,
473 )
474 )
476 session.commit()
478 if user.has_completed_profile:
479 queue_job(
480 session,
481 "generate_event_create_notifications",
482 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
483 inviting_user_id=user.id,
484 occurrence_id=occurrence.id,
485 approved=False,
486 ),
487 )
489 return event_to_pb(session, occurrence, context)
491 def ScheduleEvent(self, request, context, session):
492 if not request.content:
493 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
494 if request.HasField("online_information"):
495 geom = None
496 address = None
497 link = request.online_information.link
498 elif request.HasField("offline_information"):
499 if not (
500 request.offline_information.address
501 and request.offline_information.lat
502 and request.offline_information.lng
503 ):
504 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
505 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
506 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
507 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
508 address = request.offline_information.address
509 link = None
510 else:
511 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
513 start_time = to_aware_datetime(request.start_time)
514 end_time = to_aware_datetime(request.end_time)
516 _check_occurrence_time_validity(start_time, end_time, context)
518 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
519 if not res:
520 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
522 event, occurrence = res
524 if not _can_edit_event(session, event, context.user_id):
525 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
527 if occurrence.is_cancelled:
528 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
530 if (
531 request.photo_key
532 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
533 ):
534 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
536 during = DateTimeTZRange(start_time, end_time)
538 # && is the overlap operator for ranges
539 if (
540 session.execute(
541 select(EventOccurrence.id)
542 .where(EventOccurrence.event_id == event.id)
543 .where(EventOccurrence.during.op("&&")(during))
544 .limit(1)
545 )
546 .scalars()
547 .one_or_none()
548 is not None
549 ):
550 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
552 occurrence = EventOccurrence(
553 event=event,
554 content=request.content,
555 geom=geom,
556 address=address,
557 link=link,
558 photo_key=request.photo_key if request.photo_key != "" else None,
559 # timezone=timezone,
560 during=during,
561 creator_user_id=context.user_id,
562 )
563 session.add(occurrence)
565 session.add(
566 EventOccurrenceAttendee(
567 user_id=context.user_id,
568 occurrence=occurrence,
569 attendee_status=AttendeeStatus.going,
570 )
571 )
573 session.flush()
575 # TODO: notify
577 return event_to_pb(session, occurrence, context)
579 def UpdateEvent(self, request, context, session):
580 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
581 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
582 if not res:
583 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
585 event, occurrence = res
587 if not _can_edit_event(session, event, context.user_id):
588 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
590 # the things that were updated and need to be notified about
591 notify_updated = []
593 if occurrence.is_cancelled:
594 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
596 occurrence_update = {"last_edited": now()}
598 if request.HasField("title"):
599 notify_updated.append("title")
600 event.title = request.title.value
601 event.last_edited = now()
603 if request.HasField("content"):
604 notify_updated.append("content")
605 occurrence_update["content"] = request.content.value
607 if request.HasField("photo_key"):
608 occurrence_update["photo_key"] = request.photo_key.value
610 if request.HasField("online_information"):
611 notify_updated.append("location")
612 if not request.online_information.link:
613 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
614 occurrence_update["link"] = request.online_information.link
615 occurrence_update["geom"] = None
616 occurrence_update["address"] = None
617 elif request.HasField("offline_information"):
618 notify_updated.append("location")
619 occurrence_update["link"] = None
620 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
621 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
622 occurrence_update["geom"] = create_coordinate(
623 request.offline_information.lat, request.offline_information.lng
624 )
625 occurrence_update["address"] = request.offline_information.address
627 if request.HasField("start_time") or request.HasField("end_time"):
628 if request.update_all_future:
629 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES)
630 if request.HasField("start_time"):
631 notify_updated.append("start time")
632 start_time = to_aware_datetime(request.start_time)
633 else:
634 start_time = occurrence.start_time
635 if request.HasField("end_time"):
636 notify_updated.append("end time")
637 end_time = to_aware_datetime(request.end_time)
638 else:
639 end_time = occurrence.end_time
641 _check_occurrence_time_validity(start_time, end_time, context)
643 during = DateTimeTZRange(start_time, end_time)
645 # && is the overlap operator for ranges
646 if (
647 session.execute(
648 select(EventOccurrence.id)
649 .where(EventOccurrence.event_id == event.id)
650 .where(EventOccurrence.id != occurrence.id)
651 .where(EventOccurrence.during.op("&&")(during))
652 .limit(1)
653 )
654 .scalars()
655 .one_or_none()
656 is not None
657 ):
658 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
660 occurrence_update["during"] = during
662 # TODO
663 # if request.HasField("timezone"):
664 # occurrence_update["timezone"] = request.timezone
666 # allow editing any event which hasn't ended more than 24 hours before now
667 # when editing all future events, we edit all which have not yet ended
669 if request.update_all_future:
670 session.execute(
671 update(EventOccurrence)
672 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
673 .where(EventOccurrence.start_time >= occurrence.start_time)
674 .values(occurrence_update)
675 .execution_options(synchronize_session=False)
676 )
677 else:
678 if occurrence.end_time < now() - timedelta(hours=24):
679 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
680 session.execute(
681 update(EventOccurrence)
682 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
683 .where(EventOccurrence.id == occurrence.id)
684 .values(occurrence_update)
685 .execution_options(synchronize_session=False)
686 )
688 session.flush()
690 if notify_updated:
691 if request.should_notify:
692 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying")
694 queue_job(
695 session,
696 "generate_event_update_notifications",
697 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload(
698 updating_user_id=user.id,
699 occurrence_id=occurrence.id,
700 updated_items=notify_updated,
701 ),
702 )
703 else:
704 logger.info(
705 f"Fields {','.join(notify_updated)} updated in event {event.id=}, but skipping notifications"
706 )
708 # since we have synchronize_session=False, we have to refresh the object
709 session.refresh(occurrence)
711 return event_to_pb(session, occurrence, context)
713 def GetEvent(self, request, context, session):
714 occurrence = session.execute(
715 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
716 ).scalar_one_or_none()
718 if not occurrence:
719 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
721 return event_to_pb(session, occurrence, context)
723 def CancelEvent(self, request, context, session):
724 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
725 if not res:
726 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
728 event, occurrence = res
730 if not _can_edit_event(session, event, context.user_id):
731 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
733 if occurrence.end_time < now() - timedelta(hours=24):
734 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT)
736 occurrence.is_cancelled = True
738 queue_job(
739 session,
740 "generate_event_cancel_notifications",
741 payload=jobs_pb2.GenerateEventCancelNotificationsPayload(
742 cancelling_user_id=context.user_id,
743 occurrence_id=occurrence.id,
744 ),
745 )
747 return empty_pb2.Empty()
749 def RequestCommunityInvite(self, request, context, session):
750 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
751 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
752 if not res:
753 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
755 event, occurrence = res
757 if not _can_edit_event(session, event, context.user_id):
758 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
760 if occurrence.is_cancelled:
761 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
763 if occurrence.end_time < now() - timedelta(hours=24):
764 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
766 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id]
768 if len(this_user_reqs) > 0:
769 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED)
771 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved]
773 if len(approved_reqs) > 0:
774 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED)
776 request = EventCommunityInviteRequest(
777 occurrence_id=request.event_id,
778 user_id=context.user_id,
779 )
780 session.add(request)
781 session.flush()
783 send_event_community_invite_request_email(session, request)
785 return empty_pb2.Empty()
787 def ListEventOccurrences(self, request, context, session):
788 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
789 # the page token is a unix timestamp of where we left off
790 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
791 occurrence = session.execute(
792 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
793 ).scalar_one_or_none()
794 if not occurrence:
795 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
797 occurrences = (
798 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted)
799 )
801 if not request.include_cancelled:
802 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
804 if not request.past:
805 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
806 EventOccurrence.start_time.asc()
807 )
808 else:
809 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
810 EventOccurrence.start_time.desc()
811 )
813 occurrences = occurrences.limit(page_size + 1)
814 occurrences = session.execute(occurrences).scalars().all()
816 return events_pb2.ListEventOccurrencesRes(
817 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
818 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
819 )
821 def ListEventAttendees(self, request, context, session):
822 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
823 next_user_id = int(request.page_token) if request.page_token else 0
824 occurrence = session.execute(
825 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
826 ).scalar_one_or_none()
827 if not occurrence:
828 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
829 attendees = (
830 session.execute(
831 select(EventOccurrenceAttendee)
832 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
833 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
834 .where(EventOccurrenceAttendee.user_id >= next_user_id)
835 .order_by(EventOccurrenceAttendee.user_id)
836 .limit(page_size + 1)
837 )
838 .scalars()
839 .all()
840 )
841 return events_pb2.ListEventAttendeesRes(
842 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
843 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
844 )
846 def ListEventSubscribers(self, request, context, session):
847 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
848 next_user_id = int(request.page_token) if request.page_token else 0
849 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
850 if not res:
851 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
852 event, occurrence = res
853 subscribers = (
854 session.execute(
855 select(EventSubscription)
856 .where_users_column_visible(context, EventSubscription.user_id)
857 .where(EventSubscription.event_id == event.id)
858 .where(EventSubscription.user_id >= next_user_id)
859 .order_by(EventSubscription.user_id)
860 .limit(page_size + 1)
861 )
862 .scalars()
863 .all()
864 )
865 return events_pb2.ListEventSubscribersRes(
866 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
867 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
868 )
870 def ListEventOrganizers(self, request, context, session):
871 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
872 next_user_id = int(request.page_token) if request.page_token else 0
873 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
874 if not res:
875 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
876 event, occurrence = res
877 organizers = (
878 session.execute(
879 select(EventOrganizer)
880 .where_users_column_visible(context, EventOrganizer.user_id)
881 .where(EventOrganizer.event_id == event.id)
882 .where(EventOrganizer.user_id >= next_user_id)
883 .order_by(EventOrganizer.user_id)
884 .limit(page_size + 1)
885 )
886 .scalars()
887 .all()
888 )
889 return events_pb2.ListEventOrganizersRes(
890 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
891 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
892 )
894 def TransferEvent(self, request, context, session):
895 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
896 if not res:
897 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
899 event, occurrence = res
901 if not _can_edit_event(session, event, context.user_id):
902 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED)
904 if occurrence.is_cancelled:
905 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
907 if occurrence.end_time < now() - timedelta(hours=24):
908 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
910 if request.WhichOneof("new_owner") == "new_owner_group_id":
911 cluster = session.execute(
912 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
913 ).scalar_one_or_none()
914 elif request.WhichOneof("new_owner") == "new_owner_community_id":
915 cluster = session.execute(
916 select(Cluster)
917 .where(Cluster.parent_node_id == request.new_owner_community_id)
918 .where(Cluster.is_official_cluster)
919 ).scalar_one_or_none()
921 if not cluster:
922 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
924 event.owner_user = None
925 event.owner_cluster = cluster
927 session.commit()
928 return event_to_pb(session, occurrence, context)
930 def SetEventSubscription(self, request, context, session):
931 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
932 if not res:
933 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
935 event, occurrence = res
937 if occurrence.is_cancelled:
938 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
940 if occurrence.end_time < now() - timedelta(hours=24):
941 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
943 current_subscription = session.execute(
944 select(EventSubscription)
945 .where(EventSubscription.user_id == context.user_id)
946 .where(EventSubscription.event_id == event.id)
947 ).scalar_one_or_none()
949 # if not subscribed, subscribe
950 if request.subscribe and not current_subscription:
951 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
953 # if subscribed but unsubbing, remove subscription
954 if not request.subscribe and current_subscription:
955 session.delete(current_subscription)
957 session.flush()
959 return event_to_pb(session, occurrence, context)
961 def SetEventAttendance(self, request, context, session):
962 occurrence = session.execute(
963 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
964 ).scalar_one_or_none()
966 if not occurrence:
967 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
969 if occurrence.is_cancelled:
970 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
972 if occurrence.end_time < now() - timedelta(hours=24):
973 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
975 current_attendance = session.execute(
976 select(EventOccurrenceAttendee)
977 .where(EventOccurrenceAttendee.user_id == context.user_id)
978 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
979 ).scalar_one_or_none()
981 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
982 if current_attendance:
983 session.delete(current_attendance)
984 # if unset/not going, nothing to do!
985 else:
986 if current_attendance:
987 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
988 else:
989 # create new
990 attendance = EventOccurrenceAttendee(
991 user_id=context.user_id,
992 occurrence_id=occurrence.id,
993 attendee_status=attendancestate2sql[request.attendance_state],
994 )
995 session.add(attendance)
997 session.flush()
999 return event_to_pb(session, occurrence, context)
1001 def ListMyEvents(self, request, context, session):
1002 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1003 # the page token is a unix timestamp of where we left off
1004 page_token = (
1005 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
1006 )
1007 # the page number is the page number we are on
1008 page_number = request.page_number or 1
1009 # Calculate the offset for pagination
1010 offset = (page_number - 1) * page_size
1011 occurrences = (
1012 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1013 )
1015 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
1016 include_subscribed = request.subscribed or include_all
1017 include_organizing = request.organizing or include_all
1018 include_attending = request.attending or include_all
1019 include_my_communities = request.my_communities or include_all
1021 where_ = []
1023 if include_subscribed:
1024 occurrences = occurrences.outerjoin(
1025 EventSubscription,
1026 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
1027 )
1028 where_.append(EventSubscription.user_id != None)
1029 if include_organizing:
1030 occurrences = occurrences.outerjoin(
1031 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
1032 )
1033 where_.append(EventOrganizer.user_id != None)
1034 if include_attending:
1035 occurrences = occurrences.outerjoin(
1036 EventOccurrenceAttendee,
1037 and_(
1038 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
1039 EventOccurrenceAttendee.user_id == context.user_id,
1040 ),
1041 )
1042 where_.append(EventOccurrenceAttendee.user_id != None)
1043 if include_my_communities:
1044 my_communities = (
1045 session.execute(
1046 select(Node.id)
1047 .join(Cluster, Cluster.parent_node_id == Node.id)
1048 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
1049 .where(ClusterSubscription.user_id == context.user_id)
1050 .where(Cluster.is_official_cluster)
1051 .order_by(Node.id)
1052 .limit(100000)
1053 )
1054 .scalars()
1055 .all()
1056 )
1057 where_.append(Event.parent_node_id.in_(my_communities))
1059 occurrences = occurrences.where(or_(*where_))
1061 if not request.include_cancelled:
1062 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1064 if not request.past:
1065 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1066 EventOccurrence.start_time.asc()
1067 )
1068 else:
1069 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1070 EventOccurrence.start_time.desc()
1071 )
1072 # Count the total number of items for pagination
1073 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar()
1074 # Apply pagination by page number
1075 occurrences = (
1076 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1)
1077 )
1078 occurrences = session.execute(occurrences).scalars().all()
1080 return events_pb2.ListMyEventsRes(
1081 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1082 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1083 total_items=total_items,
1084 )
1086 def ListAllEvents(self, request, context, session):
1087 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1088 # the page token is a unix timestamp of where we left off
1089 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
1091 occurrences = (
1092 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1093 )
1095 if not request.include_cancelled:
1096 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1098 if not request.past:
1099 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1100 EventOccurrence.start_time.asc()
1101 )
1102 else:
1103 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1104 EventOccurrence.start_time.desc()
1105 )
1107 occurrences = occurrences.limit(page_size + 1)
1108 occurrences = session.execute(occurrences).scalars().all()
1110 return events_pb2.ListAllEventsRes(
1111 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1112 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1113 )
1115 def InviteEventOrganizer(self, request, context, session):
1116 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
1117 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1118 if not res:
1119 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1121 event, occurrence = res
1123 if not _can_edit_event(session, event, context.user_id):
1124 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
1126 if occurrence.is_cancelled:
1127 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1129 if occurrence.end_time < now() - timedelta(hours=24):
1130 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1132 if not session.execute(
1133 select(User).where_users_visible(context).where(User.id == request.user_id)
1134 ).scalar_one_or_none():
1135 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
1137 session.add(
1138 EventOrganizer(
1139 user_id=request.user_id,
1140 event=event,
1141 )
1142 )
1143 session.flush()
1145 other_user_context = make_background_user_context(user_id=request.user_id)
1147 notify(
1148 session,
1149 user_id=request.user_id,
1150 topic_action="event:invite_organizer",
1151 key=event.id,
1152 data=notification_data_pb2.EventInviteOrganizer(
1153 event=event_to_pb(session, occurrence, other_user_context),
1154 inviting_user=user_model_to_pb(user, session, other_user_context),
1155 ),
1156 )
1158 return empty_pb2.Empty()
1160 def RemoveEventOrganizer(self, request, context, session):
1161 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1162 if not res:
1163 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1165 event, occurrence = res
1167 if occurrence.is_cancelled:
1168 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1170 if occurrence.end_time < now() - timedelta(hours=24):
1171 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1173 # Determine which user to remove
1174 user_id_to_remove = request.user_id.value if request.HasField("user_id") else context.user_id
1176 # Check if the target user is the event owner (only after permission check)
1177 if event.owner_user_id == user_id_to_remove:
1178 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER)
1180 # Check permissions: either an organizer removing an organizer OR you're the event owner
1181 if not _can_edit_event(session, event, context.user_id):
1182 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_EDIT_PERMISSION_DENIED)
1184 # Find the organizer to remove
1185 organizer_to_remove = session.execute(
1186 select(EventOrganizer)
1187 .where(EventOrganizer.user_id == user_id_to_remove)
1188 .where(EventOrganizer.event_id == event.id)
1189 ).scalar_one_or_none()
1191 if not organizer_to_remove:
1192 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER)
1194 session.delete(organizer_to_remove)
1196 return empty_pb2.Empty()