Coverage for src/couchers/servicers/events.py: 84%
491 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-03-11 15:27 +0000
1import logging
2from datetime import timedelta
3from types import SimpleNamespace
5import grpc
6from google.protobuf import empty_pb2
7from psycopg2.extras import DateTimeTZRange
8from sqlalchemy.sql import and_, func, or_, select, update
10from couchers import errors
11from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.models import (
14 AttendeeStatus,
15 Cluster,
16 ClusterSubscription,
17 Event,
18 EventCommunityInviteRequest,
19 EventOccurrence,
20 EventOccurrenceAttendee,
21 EventOrganizer,
22 EventSubscription,
23 Node,
24 Thread,
25 Upload,
26 User,
27)
28from couchers.notifications.notify import notify
29from couchers.servicers.api import user_model_to_pb
30from couchers.servicers.blocking import are_blocked
31from couchers.servicers.threads import thread_to_pb
32from couchers.sql import couchers_select as select
33from couchers.tasks import send_event_community_invite_request_email
34from couchers.utils import (
35 Timestamp_from_datetime,
36 create_coordinate,
37 dt_from_millis,
38 millis_from_dt,
39 now,
40 to_aware_datetime,
41)
42from proto import events_pb2, events_pb2_grpc, notification_data_pb2
43from proto.internal import jobs_pb2
45logger = logging.getLogger(__name__)
47attendancestate2sql = {
48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
51}
53attendancestate2api = {
54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
57}
59MAX_PAGINATION_LENGTH = 25
62def _is_event_owner(event: Event, user_id):
63 """
64 Checks whether the user can act as an owner of the event
65 """
66 if event.owner_user:
67 return event.owner_user_id == user_id
68 # otherwise owned by a cluster
69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
72def _can_moderate_event(session, event: Event, user_id):
73 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
74 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
75 return True
77 # finally check if the user can moderate the parent node of the cluster
78 return can_moderate_node(session, user_id, event.parent_node_id)
81def _can_edit_event(session, event, user_id):
82 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id)
85def event_to_pb(session, occurrence: EventOccurrence, context):
86 event = occurrence.event
88 next_occurrence = (
89 event.occurrences.where(EventOccurrence.end_time >= now()).order_by(EventOccurrence.end_time.asc()).first()
90 )
92 owner_community_id = None
93 owner_group_id = None
94 if event.owner_cluster:
95 if event.owner_cluster.is_official_cluster:
96 owner_community_id = event.owner_cluster.parent_node_id
97 else:
98 owner_group_id = event.owner_cluster.id
100 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
101 attendance_state = attendance.attendee_status if attendance else None
103 can_moderate = _can_moderate_event(session, event, context.user_id)
104 can_edit = _can_edit_event(session, event, context.user_id)
106 going_count = session.execute(
107 select(func.count())
108 .select_from(EventOccurrenceAttendee)
109 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
110 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
111 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
112 ).scalar_one()
113 maybe_count = session.execute(
114 select(func.count())
115 .select_from(EventOccurrenceAttendee)
116 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
117 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
118 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
119 ).scalar_one()
121 organizer_count = session.execute(
122 select(func.count())
123 .select_from(EventOrganizer)
124 .where_users_column_visible(context, EventOrganizer.user_id)
125 .where(EventOrganizer.event_id == event.id)
126 ).scalar_one()
127 subscriber_count = session.execute(
128 select(func.count())
129 .select_from(EventSubscription)
130 .where_users_column_visible(context, EventSubscription.user_id)
131 .where(EventSubscription.event_id == event.id)
132 ).scalar_one()
134 return events_pb2.Event(
135 event_id=occurrence.id,
136 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
137 is_cancelled=occurrence.is_cancelled,
138 is_deleted=occurrence.is_deleted,
139 title=event.title,
140 slug=event.slug,
141 content=occurrence.content,
142 photo_url=occurrence.photo.full_url if occurrence.photo else None,
143 online_information=(
144 events_pb2.OnlineEventInformation(
145 link=occurrence.link,
146 )
147 if occurrence.link
148 else None
149 ),
150 offline_information=(
151 events_pb2.OfflineEventInformation(
152 lat=occurrence.coordinates[0],
153 lng=occurrence.coordinates[1],
154 address=occurrence.address,
155 )
156 if occurrence.geom
157 else None
158 ),
159 created=Timestamp_from_datetime(occurrence.created),
160 last_edited=Timestamp_from_datetime(occurrence.last_edited),
161 creator_user_id=occurrence.creator_user_id,
162 start_time=Timestamp_from_datetime(occurrence.start_time),
163 end_time=Timestamp_from_datetime(occurrence.end_time),
164 timezone=occurrence.timezone,
165 start_time_display=str(occurrence.start_time),
166 end_time_display=str(occurrence.end_time),
167 attendance_state=attendancestate2api[attendance_state],
168 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
169 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
170 going_count=going_count,
171 maybe_count=maybe_count,
172 organizer_count=organizer_count,
173 subscriber_count=subscriber_count,
174 owner_user_id=event.owner_user_id,
175 owner_community_id=owner_community_id,
176 owner_group_id=owner_group_id,
177 thread=thread_to_pb(session, event.thread_id),
178 can_edit=can_edit,
179 can_moderate=can_moderate,
180 )
183def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool):
184 query = (
185 select(Event, EventOccurrence)
186 .where(EventOccurrence.id == occurrence_id)
187 .where(EventOccurrence.event_id == Event.id)
188 )
190 if not include_deleted:
191 query = query.where(~EventOccurrence.is_deleted)
193 return query
196def _get_event_and_occurrence_one(
197 session, occurrence_id, include_deleted: bool = False
198) -> tuple[Event, EventOccurrence]:
199 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one()
202def _get_event_and_occurrence_one_or_none(
203 session, occurrence_id, include_deleted: bool = False
204) -> tuple[Event, EventOccurrence] | None:
205 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none()
208def _check_occurrence_time_validity(start_time, end_time, context):
209 if start_time < now():
210 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST)
211 if end_time < start_time:
212 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS)
213 if end_time - start_time > timedelta(days=7):
214 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG)
215 if start_time - now() > timedelta(days=365):
216 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE)
219def get_users_to_notify_for_new_event(session, occurrence):
220 """
221 Returns the users to notify, as well as the community id that is being notified (None if based on geo search)
222 """
223 cluster = occurrence.event.parent_node.official_cluster
224 if cluster.parent_node_id == 1:
225 logger.info("The Global Community is too big for email notifications.")
226 return [], occurrence.event.parent_node_id
227 elif occurrence.creator_user in cluster.admins or cluster.is_leaf:
228 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id
229 else:
230 max_radius = 20000 # m
231 users = (
232 session.execute(
233 select(User)
234 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
235 .where(User.is_visible)
236 .where(ClusterSubscription.cluster_id == cluster.id)
237 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111))
238 )
239 .scalars()
240 .all()
241 )
242 return users, None
245def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload):
246 """
247 Background job to generated/fan out event notifications
248 """
249 from couchers.servicers.communities import community_to_pb
251 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}")
253 with session_scope() as session:
254 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
255 creator = occurrence.creator_user
257 users, node_id = get_users_to_notify_for_new_event(session, occurrence)
259 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none()
261 if not inviting_user:
262 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?")
263 return
265 for user in users:
266 if are_blocked(session, user.id, creator.id):
267 continue
268 context = SimpleNamespace(user_id=user.id)
269 notify(
270 session,
271 user_id=user.id,
272 topic_action="event:create_approved" if payload.approved else "event:create_any",
273 key=payload.occurrence_id,
274 data=notification_data_pb2.EventCreate(
275 event=event_to_pb(session, occurrence, context),
276 inviting_user=user_model_to_pb(inviting_user, session, context),
277 nearby=True if node_id is None else None,
278 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None,
279 ),
280 )
283def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload):
284 with session_scope() as session:
285 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
287 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none()
289 subscribed_user_ids = [user.id for user in event.subscribers]
290 attending_user_ids = [user.user_id for user in occurrence.attendances]
292 for user_id in set(subscribed_user_ids + attending_user_ids):
293 if are_blocked(session, user_id, updating_user.id):
294 continue
295 context = SimpleNamespace(user_id=user_id)
296 notify(
297 session,
298 user_id=user_id,
299 topic_action="event:update",
300 key=payload.occurrence_id,
301 data=notification_data_pb2.EventUpdate(
302 event=event_to_pb(session, occurrence, context),
303 updating_user=user_model_to_pb(updating_user, session, context),
304 updated_items=payload.updated_items,
305 ),
306 )
309def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload):
310 with session_scope() as session:
311 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
313 cancelling_user = session.execute(
314 select(User).where(User.id == payload.cancelling_user_id)
315 ).scalar_one_or_none()
317 subscribed_user_ids = [user.id for user in event.subscribers]
318 attending_user_ids = [user.user_id for user in occurrence.attendances]
320 for user_id in set(subscribed_user_ids + attending_user_ids):
321 if are_blocked(session, user_id, cancelling_user.id):
322 continue
323 context = SimpleNamespace(user_id=user_id)
324 notify(
325 session,
326 user_id=user_id,
327 topic_action="event:cancel",
328 key=payload.occurrence_id,
329 data=notification_data_pb2.EventCancel(
330 event=event_to_pb(session, occurrence, context),
331 cancelling_user=user_model_to_pb(cancelling_user, session, context),
332 ),
333 )
336def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload):
337 with session_scope() as session:
338 event, occurrence = _get_event_and_occurrence_one(
339 session, occurrence_id=payload.occurrence_id, include_deleted=True
340 )
342 subscribed_user_ids = [user.id for user in event.subscribers]
343 attending_user_ids = [user.user_id for user in occurrence.attendances]
345 for user_id in set(subscribed_user_ids + attending_user_ids):
346 context = SimpleNamespace(user_id=user_id)
347 notify(
348 session,
349 user_id=user_id,
350 topic_action="event:delete",
351 key=payload.occurrence_id,
352 data=notification_data_pb2.EventDelete(
353 event=event_to_pb(session, occurrence, context),
354 ),
355 )
358class Events(events_pb2_grpc.EventsServicer):
359 def CreateEvent(self, request, context, session):
360 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
361 if not user.has_completed_profile:
362 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT)
363 if not request.title:
364 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE)
365 if not request.content:
366 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
367 if request.HasField("online_information"):
368 online = True
369 geom = None
370 address = None
371 if not request.online_information.link:
372 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
373 link = request.online_information.link
374 elif request.HasField("offline_information"):
375 online = False
376 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value
377 if not (
378 request.offline_information.address
379 and request.offline_information.lat
380 and request.offline_information.lng
381 ):
382 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
383 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
384 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
385 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
386 address = request.offline_information.address
387 link = None
388 else:
389 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
391 start_time = to_aware_datetime(request.start_time)
392 end_time = to_aware_datetime(request.end_time)
394 _check_occurrence_time_validity(start_time, end_time, context)
396 if request.parent_community_id:
397 parent_node = session.execute(
398 select(Node).where(Node.id == request.parent_community_id)
399 ).scalar_one_or_none()
400 else:
401 if online:
402 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY)
403 # parent community computed from geom
404 parent_node = get_parent_node_at_location(session, geom)
406 if not parent_node:
407 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
409 if (
410 request.photo_key
411 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
412 ):
413 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
415 event = Event(
416 title=request.title,
417 parent_node_id=parent_node.id,
418 owner_user_id=context.user_id,
419 thread=Thread(),
420 creator_user_id=context.user_id,
421 )
422 session.add(event)
424 occurrence = EventOccurrence(
425 event=event,
426 content=request.content,
427 geom=geom,
428 address=address,
429 link=link,
430 photo_key=request.photo_key if request.photo_key != "" else None,
431 # timezone=timezone,
432 during=DateTimeTZRange(start_time, end_time),
433 creator_user_id=context.user_id,
434 )
435 session.add(occurrence)
437 session.add(
438 EventOrganizer(
439 user_id=context.user_id,
440 event=event,
441 )
442 )
444 session.add(
445 EventSubscription(
446 user_id=context.user_id,
447 event=event,
448 )
449 )
451 session.add(
452 EventOccurrenceAttendee(
453 user_id=context.user_id,
454 occurrence=occurrence,
455 attendee_status=AttendeeStatus.going,
456 )
457 )
459 session.commit()
461 if user.has_completed_profile:
462 queue_job(
463 session,
464 "generate_event_create_notifications",
465 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
466 inviting_user_id=user.id,
467 occurrence_id=occurrence.id,
468 approved=False,
469 ),
470 )
472 return event_to_pb(session, occurrence, context)
474 def ScheduleEvent(self, request, context, session):
475 if not request.content:
476 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
477 if request.HasField("online_information"):
478 geom = None
479 address = None
480 link = request.online_information.link
481 elif request.HasField("offline_information"):
482 if not (
483 request.offline_information.address
484 and request.offline_information.lat
485 and request.offline_information.lng
486 ):
487 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
488 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
489 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
490 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
491 address = request.offline_information.address
492 link = None
493 else:
494 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
496 start_time = to_aware_datetime(request.start_time)
497 end_time = to_aware_datetime(request.end_time)
499 _check_occurrence_time_validity(start_time, end_time, context)
501 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
502 if not res:
503 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
505 event, occurrence = res
507 if not _can_edit_event(session, event, context.user_id):
508 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
510 if occurrence.is_cancelled:
511 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
513 if (
514 request.photo_key
515 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
516 ):
517 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
519 during = DateTimeTZRange(start_time, end_time)
521 # && is the overlap operator for ranges
522 if (
523 session.execute(
524 select(EventOccurrence.id)
525 .where(EventOccurrence.event_id == event.id)
526 .where(EventOccurrence.during.op("&&")(during))
527 )
528 .scalars()
529 .first()
530 is not None
531 ):
532 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
534 occurrence = EventOccurrence(
535 event=event,
536 content=request.content,
537 geom=geom,
538 address=address,
539 link=link,
540 photo_key=request.photo_key if request.photo_key != "" else None,
541 # timezone=timezone,
542 during=during,
543 creator_user_id=context.user_id,
544 )
545 session.add(occurrence)
547 session.add(
548 EventOccurrenceAttendee(
549 user_id=context.user_id,
550 occurrence=occurrence,
551 attendee_status=AttendeeStatus.going,
552 )
553 )
555 session.flush()
557 # TODO: notify
559 return event_to_pb(session, occurrence, context)
561 def UpdateEvent(self, request, context, session):
562 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
563 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
564 if not res:
565 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
567 event, occurrence = res
569 if not _can_edit_event(session, event, context.user_id):
570 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
572 # the things that were updated and need to be notified about
573 notify_updated = []
575 if occurrence.is_cancelled:
576 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
578 occurrence_update = {"last_edited": now()}
580 if request.HasField("title"):
581 notify_updated.append("title")
582 event.title = request.title.value
583 event.last_edited = now()
585 if request.HasField("content"):
586 notify_updated.append("content")
587 occurrence_update["content"] = request.content.value
589 if request.HasField("photo_key"):
590 occurrence_update["photo_key"] = request.photo_key.value
592 if request.HasField("online_information"):
593 notify_updated.append("location")
594 if not request.online_information.link:
595 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
596 occurrence_update["link"] = request.online_information.link
597 occurrence_update["geom"] = None
598 occurrence_update["address"] = None
599 elif request.HasField("offline_information"):
600 notify_updated.append("location")
601 occurrence_update["link"] = None
602 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
603 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
604 occurrence_update["geom"] = create_coordinate(
605 request.offline_information.lat, request.offline_information.lng
606 )
607 occurrence_update["address"] = request.offline_information.address
609 if request.HasField("start_time") or request.HasField("end_time"):
610 if request.update_all_future:
611 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES)
612 if request.HasField("start_time"):
613 notify_updated.append("start time")
614 start_time = to_aware_datetime(request.start_time)
615 else:
616 start_time = occurrence.start_time
617 if request.HasField("end_time"):
618 notify_updated.append("end time")
619 end_time = to_aware_datetime(request.end_time)
620 else:
621 end_time = occurrence.end_time
623 _check_occurrence_time_validity(start_time, end_time, context)
625 during = DateTimeTZRange(start_time, end_time)
627 # && is the overlap operator for ranges
628 if (
629 session.execute(
630 select(EventOccurrence.id)
631 .where(EventOccurrence.event_id == event.id)
632 .where(EventOccurrence.id != occurrence.id)
633 .where(EventOccurrence.during.op("&&")(during))
634 )
635 .scalars()
636 .first()
637 is not None
638 ):
639 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
641 occurrence_update["during"] = during
643 # TODO
644 # if request.HasField("timezone"):
645 # occurrence_update["timezone"] = request.timezone
647 # allow editing any event which hasn't ended more than 24 hours before now
648 # when editing all future events, we edit all which have not yet ended
650 if request.update_all_future:
651 session.execute(
652 update(EventOccurrence)
653 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
654 .where(EventOccurrence.start_time >= occurrence.start_time)
655 .values(occurrence_update)
656 .execution_options(synchronize_session=False)
657 )
658 else:
659 if occurrence.end_time < now() - timedelta(hours=24):
660 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
661 session.execute(
662 update(EventOccurrence)
663 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
664 .where(EventOccurrence.id == occurrence.id)
665 .values(occurrence_update)
666 .execution_options(synchronize_session=False)
667 )
669 session.flush()
671 if notify_updated:
672 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying")
674 queue_job(
675 session,
676 "generate_event_update_notifications",
677 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload(
678 updating_user_id=user.id,
679 occurrence_id=occurrence.id,
680 updated_items=notify_updated,
681 ),
682 )
684 # since we have synchronize_session=False, we have to refresh the object
685 session.refresh(occurrence)
687 return event_to_pb(session, occurrence, context)
689 def GetEvent(self, request, context, session):
690 occurrence = session.execute(
691 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
692 ).scalar_one_or_none()
694 if not occurrence:
695 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
697 return event_to_pb(session, occurrence, context)
699 def CancelEvent(self, request, context, session):
700 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
701 if not res:
702 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
704 event, occurrence = res
706 if not _can_edit_event(session, event, context.user_id):
707 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
709 if occurrence.end_time < now() - timedelta(hours=24):
710 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT)
712 occurrence.is_cancelled = True
714 queue_job(
715 session,
716 "generate_event_cancel_notifications",
717 payload=jobs_pb2.GenerateEventCancelNotificationsPayload(
718 cancelling_user_id=context.user_id,
719 occurrence_id=occurrence.id,
720 ),
721 )
723 return empty_pb2.Empty()
725 def RequestCommunityInvite(self, request, context, session):
726 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
727 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
728 if not res:
729 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
731 event, occurrence = res
733 if not _can_edit_event(session, event, context.user_id):
734 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
736 if occurrence.is_cancelled:
737 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
739 if occurrence.end_time < now() - timedelta(hours=24):
740 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
742 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id]
744 if len(this_user_reqs) > 0:
745 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED)
747 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved]
749 if len(approved_reqs) > 0:
750 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED)
752 request = EventCommunityInviteRequest(
753 occurrence_id=request.event_id,
754 user_id=context.user_id,
755 )
756 session.add(request)
757 session.flush()
759 send_event_community_invite_request_email(session, request)
761 return empty_pb2.Empty()
763 def ListEventOccurrences(self, request, context, session):
764 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
765 # the page token is a unix timestamp of where we left off
766 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
767 occurrence = session.execute(
768 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
769 ).scalar_one_or_none()
770 if not occurrence:
771 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
773 occurrences = (
774 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted)
775 )
777 if not request.include_cancelled:
778 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
780 if not request.past:
781 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
782 EventOccurrence.start_time.asc()
783 )
784 else:
785 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
786 EventOccurrence.start_time.desc()
787 )
789 occurrences = occurrences.limit(page_size + 1)
790 occurrences = session.execute(occurrences).scalars().all()
792 return events_pb2.ListEventOccurrencesRes(
793 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
794 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
795 )
797 def ListEventAttendees(self, request, context, session):
798 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
799 next_user_id = int(request.page_token) if request.page_token else 0
800 occurrence = session.execute(
801 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
802 ).scalar_one_or_none()
803 if not occurrence:
804 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
805 attendees = (
806 session.execute(
807 select(EventOccurrenceAttendee)
808 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
809 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
810 .where(EventOccurrenceAttendee.user_id >= next_user_id)
811 .order_by(EventOccurrenceAttendee.user_id)
812 .limit(page_size + 1)
813 )
814 .scalars()
815 .all()
816 )
817 return events_pb2.ListEventAttendeesRes(
818 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
819 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
820 )
822 def ListEventSubscribers(self, request, context, session):
823 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
824 next_user_id = int(request.page_token) if request.page_token else 0
825 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
826 if not res:
827 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
828 event, occurrence = res
829 subscribers = (
830 session.execute(
831 select(EventSubscription)
832 .where_users_column_visible(context, EventSubscription.user_id)
833 .where(EventSubscription.event_id == event.id)
834 .where(EventSubscription.user_id >= next_user_id)
835 .order_by(EventSubscription.user_id)
836 .limit(page_size + 1)
837 )
838 .scalars()
839 .all()
840 )
841 return events_pb2.ListEventSubscribersRes(
842 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
843 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
844 )
846 def ListEventOrganizers(self, request, context, session):
847 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
848 next_user_id = int(request.page_token) if request.page_token else 0
849 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
850 if not res:
851 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
852 event, occurrence = res
853 organizers = (
854 session.execute(
855 select(EventOrganizer)
856 .where_users_column_visible(context, EventOrganizer.user_id)
857 .where(EventOrganizer.event_id == event.id)
858 .where(EventOrganizer.user_id >= next_user_id)
859 .order_by(EventOrganizer.user_id)
860 .limit(page_size + 1)
861 )
862 .scalars()
863 .all()
864 )
865 return events_pb2.ListEventOrganizersRes(
866 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
867 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
868 )
870 def TransferEvent(self, request, context, session):
871 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
872 if not res:
873 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
875 event, occurrence = res
877 if not _can_edit_event(session, event, context.user_id):
878 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED)
880 if occurrence.is_cancelled:
881 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
883 if occurrence.end_time < now() - timedelta(hours=24):
884 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
886 if request.WhichOneof("new_owner") == "new_owner_group_id":
887 cluster = session.execute(
888 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
889 ).scalar_one_or_none()
890 elif request.WhichOneof("new_owner") == "new_owner_community_id":
891 cluster = session.execute(
892 select(Cluster)
893 .where(Cluster.parent_node_id == request.new_owner_community_id)
894 .where(Cluster.is_official_cluster)
895 ).scalar_one_or_none()
897 if not cluster:
898 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
900 event.owner_user = None
901 event.owner_cluster = cluster
903 session.commit()
904 return event_to_pb(session, occurrence, context)
906 def SetEventSubscription(self, request, context, session):
907 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
908 if not res:
909 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
911 event, occurrence = res
913 if occurrence.is_cancelled:
914 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
916 if occurrence.end_time < now() - timedelta(hours=24):
917 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
919 current_subscription = session.execute(
920 select(EventSubscription)
921 .where(EventSubscription.user_id == context.user_id)
922 .where(EventSubscription.event_id == event.id)
923 ).scalar_one_or_none()
925 # if not subscribed, subscribe
926 if request.subscribe and not current_subscription:
927 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
929 # if subscribed but unsubbing, remove subscription
930 if not request.subscribe and current_subscription:
931 session.delete(current_subscription)
933 session.flush()
935 return event_to_pb(session, occurrence, context)
937 def SetEventAttendance(self, request, context, session):
938 occurrence = session.execute(
939 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
940 ).scalar_one_or_none()
942 if not occurrence:
943 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
945 if occurrence.is_cancelled:
946 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
948 if occurrence.end_time < now() - timedelta(hours=24):
949 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
951 current_attendance = session.execute(
952 select(EventOccurrenceAttendee)
953 .where(EventOccurrenceAttendee.user_id == context.user_id)
954 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
955 ).scalar_one_or_none()
957 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
958 if current_attendance:
959 session.delete(current_attendance)
960 # if unset/not going, nothing to do!
961 else:
962 if current_attendance:
963 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
964 else:
965 # create new
966 attendance = EventOccurrenceAttendee(
967 user_id=context.user_id,
968 occurrence_id=occurrence.id,
969 attendee_status=attendancestate2sql[request.attendance_state],
970 )
971 session.add(attendance)
973 session.flush()
975 return event_to_pb(session, occurrence, context)
977 def ListMyEvents(self, request, context, session):
978 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
979 # the page token is a unix timestamp of where we left off
980 page_token = (
981 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
982 )
983 # the page number is the page number we are on
984 page_number = request.page_number or 1
985 # Calculate the offset for pagination
986 offset = (page_number - 1) * page_size
987 occurrences = (
988 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
989 )
991 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
992 include_subscribed = request.subscribed or include_all
993 include_organizing = request.organizing or include_all
994 include_attending = request.attending or include_all
995 include_my_communities = request.my_communities or include_all
997 where_ = []
999 if include_subscribed:
1000 occurrences = occurrences.outerjoin(
1001 EventSubscription,
1002 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
1003 )
1004 where_.append(EventSubscription.user_id != None)
1005 if include_organizing:
1006 occurrences = occurrences.outerjoin(
1007 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
1008 )
1009 where_.append(EventOrganizer.user_id != None)
1010 if include_attending:
1011 occurrences = occurrences.outerjoin(
1012 EventOccurrenceAttendee,
1013 and_(
1014 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
1015 EventOccurrenceAttendee.user_id == context.user_id,
1016 ),
1017 )
1018 where_.append(EventOccurrenceAttendee.user_id != None)
1019 if include_my_communities:
1020 my_communities = (
1021 session.execute(
1022 select(Node.id)
1023 .join(Cluster, Cluster.parent_node_id == Node.id)
1024 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
1025 .where(ClusterSubscription.user_id == context.user_id)
1026 .where(Cluster.is_official_cluster)
1027 .order_by(Node.id)
1028 .limit(100000)
1029 )
1030 .scalars()
1031 .all()
1032 )
1033 where_.append(Event.parent_node_id.in_(my_communities))
1035 occurrences = occurrences.where(or_(*where_))
1037 if not request.include_cancelled:
1038 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1040 if not request.past:
1041 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1042 EventOccurrence.start_time.asc()
1043 )
1044 else:
1045 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1046 EventOccurrence.start_time.desc()
1047 )
1048 # Count the total number of items for pagination
1049 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar()
1050 # Apply pagination by page number
1051 occurrences = (
1052 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1)
1053 )
1054 occurrences = session.execute(occurrences).scalars().all()
1056 return events_pb2.ListMyEventsRes(
1057 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1058 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1059 total_items=total_items,
1060 )
1062 def ListAllEvents(self, request, context, session):
1063 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1064 # the page token is a unix timestamp of where we left off
1065 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
1067 occurrences = (
1068 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1069 )
1071 if not request.include_cancelled:
1072 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1074 if not request.past:
1075 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1076 EventOccurrence.start_time.asc()
1077 )
1078 else:
1079 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1080 EventOccurrence.start_time.desc()
1081 )
1083 occurrences = occurrences.limit(page_size + 1)
1084 occurrences = session.execute(occurrences).scalars().all()
1086 return events_pb2.ListAllEventsRes(
1087 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1088 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1089 )
1091 def InviteEventOrganizer(self, request, context, session):
1092 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
1093 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1094 if not res:
1095 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1097 event, occurrence = res
1099 if not _can_edit_event(session, event, context.user_id):
1100 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
1102 if occurrence.is_cancelled:
1103 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1105 if occurrence.end_time < now() - timedelta(hours=24):
1106 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1108 if not session.execute(
1109 select(User).where_users_visible(context).where(User.id == request.user_id)
1110 ).scalar_one_or_none():
1111 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
1113 session.add(
1114 EventOrganizer(
1115 user_id=request.user_id,
1116 event=event,
1117 )
1118 )
1119 session.flush()
1121 other_user_context = SimpleNamespace(user_id=request.user_id)
1123 notify(
1124 session,
1125 user_id=request.user_id,
1126 topic_action="event:invite_organizer",
1127 key=event.id,
1128 data=notification_data_pb2.EventInviteOrganizer(
1129 event=event_to_pb(session, occurrence, other_user_context),
1130 inviting_user=user_model_to_pb(user, session, other_user_context),
1131 ),
1132 )
1134 return empty_pb2.Empty()
1136 def RemoveEventOrganizer(self, request, context, session):
1137 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1138 if not res:
1139 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1141 event, occurrence = res
1143 if occurrence.is_cancelled:
1144 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1146 if occurrence.end_time < now() - timedelta(hours=24):
1147 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1149 if event.owner_user_id == context.user_id:
1150 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER)
1152 current = session.execute(
1153 select(EventOrganizer)
1154 .where(EventOrganizer.user_id == context.user_id)
1155 .where(EventOrganizer.event_id == event.id)
1156 ).scalar_one_or_none()
1158 if not current:
1159 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER)
1161 session.delete(current)
1163 return empty_pb2.Empty()