Coverage for src/couchers/servicers/events.py: 84%
490 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-06-01 15:07 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from psycopg2.extras import DateTimeTZRange
7from sqlalchemy.sql import and_, func, or_, select, update
9from couchers import errors
10from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
11from couchers.jobs.enqueue import queue_job
12from couchers.models import (
13 AttendeeStatus,
14 Cluster,
15 ClusterSubscription,
16 Event,
17 EventCommunityInviteRequest,
18 EventOccurrence,
19 EventOccurrenceAttendee,
20 EventOrganizer,
21 EventSubscription,
22 Node,
23 Thread,
24 Upload,
25 User,
26)
27from couchers.notifications.notify import notify
28from couchers.servicers.api import user_model_to_pb
29from couchers.servicers.blocking import are_blocked
30from couchers.servicers.threads import thread_to_pb
31from couchers.sql import couchers_select as select
32from couchers.tasks import send_event_community_invite_request_email
33from couchers.utils import (
34 Timestamp_from_datetime,
35 create_coordinate,
36 dt_from_millis,
37 make_user_context,
38 millis_from_dt,
39 now,
40 to_aware_datetime,
41)
42from proto import events_pb2, events_pb2_grpc, notification_data_pb2
43from proto.internal import jobs_pb2
45logger = logging.getLogger(__name__)
47attendancestate2sql = {
48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
51}
53attendancestate2api = {
54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
57}
59MAX_PAGINATION_LENGTH = 25
62def _is_event_owner(event: Event, user_id):
63 """
64 Checks whether the user can act as an owner of the event
65 """
66 if event.owner_user:
67 return event.owner_user_id == user_id
68 # otherwise owned by a cluster
69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
72def _can_moderate_event(session, event: Event, user_id):
73 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
74 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
75 return True
77 # finally check if the user can moderate the parent node of the cluster
78 return can_moderate_node(session, user_id, event.parent_node_id)
81def _can_edit_event(session, event, user_id):
82 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id)
85def event_to_pb(session, occurrence: EventOccurrence, context):
86 event = occurrence.event
88 next_occurrence = (
89 event.occurrences.where(EventOccurrence.end_time >= now())
90 .order_by(EventOccurrence.end_time.asc())
91 .limit(1)
92 .one_or_none()
93 )
95 owner_community_id = None
96 owner_group_id = None
97 if event.owner_cluster:
98 if event.owner_cluster.is_official_cluster:
99 owner_community_id = event.owner_cluster.parent_node_id
100 else:
101 owner_group_id = event.owner_cluster.id
103 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
104 attendance_state = attendance.attendee_status if attendance else None
106 can_moderate = _can_moderate_event(session, event, context.user_id)
107 can_edit = _can_edit_event(session, event, context.user_id)
109 going_count = session.execute(
110 select(func.count())
111 .select_from(EventOccurrenceAttendee)
112 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
113 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
114 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
115 ).scalar_one()
116 maybe_count = session.execute(
117 select(func.count())
118 .select_from(EventOccurrenceAttendee)
119 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
120 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
121 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
122 ).scalar_one()
124 organizer_count = session.execute(
125 select(func.count())
126 .select_from(EventOrganizer)
127 .where_users_column_visible(context, EventOrganizer.user_id)
128 .where(EventOrganizer.event_id == event.id)
129 ).scalar_one()
130 subscriber_count = session.execute(
131 select(func.count())
132 .select_from(EventSubscription)
133 .where_users_column_visible(context, EventSubscription.user_id)
134 .where(EventSubscription.event_id == event.id)
135 ).scalar_one()
137 return events_pb2.Event(
138 event_id=occurrence.id,
139 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
140 is_cancelled=occurrence.is_cancelled,
141 is_deleted=occurrence.is_deleted,
142 title=event.title,
143 slug=event.slug,
144 content=occurrence.content,
145 photo_url=occurrence.photo.full_url if occurrence.photo else None,
146 online_information=(
147 events_pb2.OnlineEventInformation(
148 link=occurrence.link,
149 )
150 if occurrence.link
151 else None
152 ),
153 offline_information=(
154 events_pb2.OfflineEventInformation(
155 lat=occurrence.coordinates[0],
156 lng=occurrence.coordinates[1],
157 address=occurrence.address,
158 )
159 if occurrence.geom
160 else None
161 ),
162 created=Timestamp_from_datetime(occurrence.created),
163 last_edited=Timestamp_from_datetime(occurrence.last_edited),
164 creator_user_id=occurrence.creator_user_id,
165 start_time=Timestamp_from_datetime(occurrence.start_time),
166 end_time=Timestamp_from_datetime(occurrence.end_time),
167 timezone=occurrence.timezone,
168 start_time_display=str(occurrence.start_time),
169 end_time_display=str(occurrence.end_time),
170 attendance_state=attendancestate2api[attendance_state],
171 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
172 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
173 going_count=going_count,
174 maybe_count=maybe_count,
175 organizer_count=organizer_count,
176 subscriber_count=subscriber_count,
177 owner_user_id=event.owner_user_id,
178 owner_community_id=owner_community_id,
179 owner_group_id=owner_group_id,
180 thread=thread_to_pb(session, event.thread_id),
181 can_edit=can_edit,
182 can_moderate=can_moderate,
183 )
186def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool):
187 query = (
188 select(Event, EventOccurrence)
189 .where(EventOccurrence.id == occurrence_id)
190 .where(EventOccurrence.event_id == Event.id)
191 )
193 if not include_deleted:
194 query = query.where(~EventOccurrence.is_deleted)
196 return query
199def _get_event_and_occurrence_one(
200 session, occurrence_id, include_deleted: bool = False
201) -> tuple[Event, EventOccurrence]:
202 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one()
205def _get_event_and_occurrence_one_or_none(
206 session, occurrence_id, include_deleted: bool = False
207) -> tuple[Event, EventOccurrence] | None:
208 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none()
211def _check_occurrence_time_validity(start_time, end_time, context):
212 if start_time < now():
213 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST)
214 if end_time < start_time:
215 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS)
216 if end_time - start_time > timedelta(days=7):
217 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG)
218 if start_time - now() > timedelta(days=365):
219 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE)
222def get_users_to_notify_for_new_event(session, occurrence):
223 """
224 Returns the users to notify, as well as the community id that is being notified (None if based on geo search)
225 """
226 cluster = occurrence.event.parent_node.official_cluster
227 if cluster.parent_node_id == 1:
228 logger.info("The Global Community is too big for email notifications.")
229 return [], occurrence.event.parent_node_id
230 elif occurrence.creator_user in cluster.admins or cluster.is_leaf:
231 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id
232 else:
233 max_radius = 20000 # m
234 users = (
235 session.execute(
236 select(User)
237 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
238 .where(User.is_visible)
239 .where(ClusterSubscription.cluster_id == cluster.id)
240 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111))
241 )
242 .scalars()
243 .all()
244 )
245 return users, None
248def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload):
249 """
250 Background job to generated/fan out event notifications
251 """
252 from couchers.servicers.communities import community_to_pb
254 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}")
256 with session_scope() as session:
257 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
258 creator = occurrence.creator_user
260 users, node_id = get_users_to_notify_for_new_event(session, occurrence)
262 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none()
264 if not inviting_user:
265 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?")
266 return
268 for user in users:
269 if are_blocked(session, user.id, creator.id):
270 continue
271 context = make_user_context(user_id=user.id)
272 notify(
273 session,
274 user_id=user.id,
275 topic_action="event:create_approved" if payload.approved else "event:create_any",
276 key=payload.occurrence_id,
277 data=notification_data_pb2.EventCreate(
278 event=event_to_pb(session, occurrence, context),
279 inviting_user=user_model_to_pb(inviting_user, session, context),
280 nearby=True if node_id is None else None,
281 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None,
282 ),
283 )
286def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload):
287 with session_scope() as session:
288 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
290 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none()
292 subscribed_user_ids = [user.id for user in event.subscribers]
293 attending_user_ids = [user.user_id for user in occurrence.attendances]
295 for user_id in set(subscribed_user_ids + attending_user_ids):
296 if are_blocked(session, user_id, updating_user.id):
297 continue
298 context = make_user_context(user_id=user_id)
299 notify(
300 session,
301 user_id=user_id,
302 topic_action="event:update",
303 key=payload.occurrence_id,
304 data=notification_data_pb2.EventUpdate(
305 event=event_to_pb(session, occurrence, context),
306 updating_user=user_model_to_pb(updating_user, session, context),
307 updated_items=payload.updated_items,
308 ),
309 )
312def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload):
313 with session_scope() as session:
314 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
316 cancelling_user = session.execute(
317 select(User).where(User.id == payload.cancelling_user_id)
318 ).scalar_one_or_none()
320 subscribed_user_ids = [user.id for user in event.subscribers]
321 attending_user_ids = [user.user_id for user in occurrence.attendances]
323 for user_id in set(subscribed_user_ids + attending_user_ids):
324 if are_blocked(session, user_id, cancelling_user.id):
325 continue
326 context = make_user_context(user_id=user_id)
327 notify(
328 session,
329 user_id=user_id,
330 topic_action="event:cancel",
331 key=payload.occurrence_id,
332 data=notification_data_pb2.EventCancel(
333 event=event_to_pb(session, occurrence, context),
334 cancelling_user=user_model_to_pb(cancelling_user, session, context),
335 ),
336 )
339def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload):
340 with session_scope() as session:
341 event, occurrence = _get_event_and_occurrence_one(
342 session, occurrence_id=payload.occurrence_id, include_deleted=True
343 )
345 subscribed_user_ids = [user.id for user in event.subscribers]
346 attending_user_ids = [user.user_id for user in occurrence.attendances]
348 for user_id in set(subscribed_user_ids + attending_user_ids):
349 context = make_user_context(user_id=user_id)
350 notify(
351 session,
352 user_id=user_id,
353 topic_action="event:delete",
354 key=payload.occurrence_id,
355 data=notification_data_pb2.EventDelete(
356 event=event_to_pb(session, occurrence, context),
357 ),
358 )
361class Events(events_pb2_grpc.EventsServicer):
362 def CreateEvent(self, request, context, session):
363 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
364 if not user.has_completed_profile:
365 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT)
366 if not request.title:
367 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE)
368 if not request.content:
369 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
370 if request.HasField("online_information"):
371 online = True
372 geom = None
373 address = None
374 if not request.online_information.link:
375 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
376 link = request.online_information.link
377 elif request.HasField("offline_information"):
378 online = False
379 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value
380 if not (
381 request.offline_information.address
382 and request.offline_information.lat
383 and request.offline_information.lng
384 ):
385 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
386 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
387 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
388 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
389 address = request.offline_information.address
390 link = None
391 else:
392 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
394 start_time = to_aware_datetime(request.start_time)
395 end_time = to_aware_datetime(request.end_time)
397 _check_occurrence_time_validity(start_time, end_time, context)
399 if request.parent_community_id:
400 parent_node = session.execute(
401 select(Node).where(Node.id == request.parent_community_id)
402 ).scalar_one_or_none()
403 else:
404 if online:
405 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY)
406 # parent community computed from geom
407 parent_node = get_parent_node_at_location(session, geom)
409 if not parent_node:
410 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
412 if (
413 request.photo_key
414 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
415 ):
416 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
418 event = Event(
419 title=request.title,
420 parent_node_id=parent_node.id,
421 owner_user_id=context.user_id,
422 thread=Thread(),
423 creator_user_id=context.user_id,
424 )
425 session.add(event)
427 occurrence = EventOccurrence(
428 event=event,
429 content=request.content,
430 geom=geom,
431 address=address,
432 link=link,
433 photo_key=request.photo_key if request.photo_key != "" else None,
434 # timezone=timezone,
435 during=DateTimeTZRange(start_time, end_time),
436 creator_user_id=context.user_id,
437 )
438 session.add(occurrence)
440 session.add(
441 EventOrganizer(
442 user_id=context.user_id,
443 event=event,
444 )
445 )
447 session.add(
448 EventSubscription(
449 user_id=context.user_id,
450 event=event,
451 )
452 )
454 session.add(
455 EventOccurrenceAttendee(
456 user_id=context.user_id,
457 occurrence=occurrence,
458 attendee_status=AttendeeStatus.going,
459 )
460 )
462 session.commit()
464 if user.has_completed_profile:
465 queue_job(
466 session,
467 "generate_event_create_notifications",
468 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
469 inviting_user_id=user.id,
470 occurrence_id=occurrence.id,
471 approved=False,
472 ),
473 )
475 return event_to_pb(session, occurrence, context)
477 def ScheduleEvent(self, request, context, session):
478 if not request.content:
479 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
480 if request.HasField("online_information"):
481 geom = None
482 address = None
483 link = request.online_information.link
484 elif request.HasField("offline_information"):
485 if not (
486 request.offline_information.address
487 and request.offline_information.lat
488 and request.offline_information.lng
489 ):
490 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
491 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
492 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
493 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
494 address = request.offline_information.address
495 link = None
496 else:
497 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
499 start_time = to_aware_datetime(request.start_time)
500 end_time = to_aware_datetime(request.end_time)
502 _check_occurrence_time_validity(start_time, end_time, context)
504 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
505 if not res:
506 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
508 event, occurrence = res
510 if not _can_edit_event(session, event, context.user_id):
511 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
513 if occurrence.is_cancelled:
514 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
516 if (
517 request.photo_key
518 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
519 ):
520 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
522 during = DateTimeTZRange(start_time, end_time)
524 # && is the overlap operator for ranges
525 if (
526 session.execute(
527 select(EventOccurrence.id)
528 .where(EventOccurrence.event_id == event.id)
529 .where(EventOccurrence.during.op("&&")(during))
530 .limit(1)
531 )
532 .scalars()
533 .one_or_none()
534 is not None
535 ):
536 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
538 occurrence = EventOccurrence(
539 event=event,
540 content=request.content,
541 geom=geom,
542 address=address,
543 link=link,
544 photo_key=request.photo_key if request.photo_key != "" else None,
545 # timezone=timezone,
546 during=during,
547 creator_user_id=context.user_id,
548 )
549 session.add(occurrence)
551 session.add(
552 EventOccurrenceAttendee(
553 user_id=context.user_id,
554 occurrence=occurrence,
555 attendee_status=AttendeeStatus.going,
556 )
557 )
559 session.flush()
561 # TODO: notify
563 return event_to_pb(session, occurrence, context)
565 def UpdateEvent(self, request, context, session):
566 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
567 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
568 if not res:
569 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
571 event, occurrence = res
573 if not _can_edit_event(session, event, context.user_id):
574 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
576 # the things that were updated and need to be notified about
577 notify_updated = []
579 if occurrence.is_cancelled:
580 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
582 occurrence_update = {"last_edited": now()}
584 if request.HasField("title"):
585 notify_updated.append("title")
586 event.title = request.title.value
587 event.last_edited = now()
589 if request.HasField("content"):
590 notify_updated.append("content")
591 occurrence_update["content"] = request.content.value
593 if request.HasField("photo_key"):
594 occurrence_update["photo_key"] = request.photo_key.value
596 if request.HasField("online_information"):
597 notify_updated.append("location")
598 if not request.online_information.link:
599 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
600 occurrence_update["link"] = request.online_information.link
601 occurrence_update["geom"] = None
602 occurrence_update["address"] = None
603 elif request.HasField("offline_information"):
604 notify_updated.append("location")
605 occurrence_update["link"] = None
606 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
607 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
608 occurrence_update["geom"] = create_coordinate(
609 request.offline_information.lat, request.offline_information.lng
610 )
611 occurrence_update["address"] = request.offline_information.address
613 if request.HasField("start_time") or request.HasField("end_time"):
614 if request.update_all_future:
615 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES)
616 if request.HasField("start_time"):
617 notify_updated.append("start time")
618 start_time = to_aware_datetime(request.start_time)
619 else:
620 start_time = occurrence.start_time
621 if request.HasField("end_time"):
622 notify_updated.append("end time")
623 end_time = to_aware_datetime(request.end_time)
624 else:
625 end_time = occurrence.end_time
627 _check_occurrence_time_validity(start_time, end_time, context)
629 during = DateTimeTZRange(start_time, end_time)
631 # && is the overlap operator for ranges
632 if (
633 session.execute(
634 select(EventOccurrence.id)
635 .where(EventOccurrence.event_id == event.id)
636 .where(EventOccurrence.id != occurrence.id)
637 .where(EventOccurrence.during.op("&&")(during))
638 .limit(1)
639 )
640 .scalars()
641 .one_or_none()
642 is not None
643 ):
644 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
646 occurrence_update["during"] = during
648 # TODO
649 # if request.HasField("timezone"):
650 # occurrence_update["timezone"] = request.timezone
652 # allow editing any event which hasn't ended more than 24 hours before now
653 # when editing all future events, we edit all which have not yet ended
655 if request.update_all_future:
656 session.execute(
657 update(EventOccurrence)
658 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
659 .where(EventOccurrence.start_time >= occurrence.start_time)
660 .values(occurrence_update)
661 .execution_options(synchronize_session=False)
662 )
663 else:
664 if occurrence.end_time < now() - timedelta(hours=24):
665 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
666 session.execute(
667 update(EventOccurrence)
668 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
669 .where(EventOccurrence.id == occurrence.id)
670 .values(occurrence_update)
671 .execution_options(synchronize_session=False)
672 )
674 session.flush()
676 if notify_updated:
677 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying")
679 queue_job(
680 session,
681 "generate_event_update_notifications",
682 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload(
683 updating_user_id=user.id,
684 occurrence_id=occurrence.id,
685 updated_items=notify_updated,
686 ),
687 )
689 # since we have synchronize_session=False, we have to refresh the object
690 session.refresh(occurrence)
692 return event_to_pb(session, occurrence, context)
694 def GetEvent(self, request, context, session):
695 occurrence = session.execute(
696 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
697 ).scalar_one_or_none()
699 if not occurrence:
700 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
702 return event_to_pb(session, occurrence, context)
704 def CancelEvent(self, request, context, session):
705 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
706 if not res:
707 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
709 event, occurrence = res
711 if not _can_edit_event(session, event, context.user_id):
712 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
714 if occurrence.end_time < now() - timedelta(hours=24):
715 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT)
717 occurrence.is_cancelled = True
719 queue_job(
720 session,
721 "generate_event_cancel_notifications",
722 payload=jobs_pb2.GenerateEventCancelNotificationsPayload(
723 cancelling_user_id=context.user_id,
724 occurrence_id=occurrence.id,
725 ),
726 )
728 return empty_pb2.Empty()
730 def RequestCommunityInvite(self, request, context, session):
731 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
732 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
733 if not res:
734 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
736 event, occurrence = res
738 if not _can_edit_event(session, event, context.user_id):
739 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
741 if occurrence.is_cancelled:
742 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
744 if occurrence.end_time < now() - timedelta(hours=24):
745 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
747 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id]
749 if len(this_user_reqs) > 0:
750 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED)
752 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved]
754 if len(approved_reqs) > 0:
755 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED)
757 request = EventCommunityInviteRequest(
758 occurrence_id=request.event_id,
759 user_id=context.user_id,
760 )
761 session.add(request)
762 session.flush()
764 send_event_community_invite_request_email(session, request)
766 return empty_pb2.Empty()
768 def ListEventOccurrences(self, request, context, session):
769 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
770 # the page token is a unix timestamp of where we left off
771 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
772 occurrence = session.execute(
773 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
774 ).scalar_one_or_none()
775 if not occurrence:
776 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
778 occurrences = (
779 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted)
780 )
782 if not request.include_cancelled:
783 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
785 if not request.past:
786 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
787 EventOccurrence.start_time.asc()
788 )
789 else:
790 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
791 EventOccurrence.start_time.desc()
792 )
794 occurrences = occurrences.limit(page_size + 1)
795 occurrences = session.execute(occurrences).scalars().all()
797 return events_pb2.ListEventOccurrencesRes(
798 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
799 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
800 )
802 def ListEventAttendees(self, request, context, session):
803 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
804 next_user_id = int(request.page_token) if request.page_token else 0
805 occurrence = session.execute(
806 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
807 ).scalar_one_or_none()
808 if not occurrence:
809 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
810 attendees = (
811 session.execute(
812 select(EventOccurrenceAttendee)
813 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
814 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
815 .where(EventOccurrenceAttendee.user_id >= next_user_id)
816 .order_by(EventOccurrenceAttendee.user_id)
817 .limit(page_size + 1)
818 )
819 .scalars()
820 .all()
821 )
822 return events_pb2.ListEventAttendeesRes(
823 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
824 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
825 )
827 def ListEventSubscribers(self, request, context, session):
828 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
829 next_user_id = int(request.page_token) if request.page_token else 0
830 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
831 if not res:
832 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
833 event, occurrence = res
834 subscribers = (
835 session.execute(
836 select(EventSubscription)
837 .where_users_column_visible(context, EventSubscription.user_id)
838 .where(EventSubscription.event_id == event.id)
839 .where(EventSubscription.user_id >= next_user_id)
840 .order_by(EventSubscription.user_id)
841 .limit(page_size + 1)
842 )
843 .scalars()
844 .all()
845 )
846 return events_pb2.ListEventSubscribersRes(
847 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
848 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
849 )
851 def ListEventOrganizers(self, request, context, session):
852 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
853 next_user_id = int(request.page_token) if request.page_token else 0
854 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
855 if not res:
856 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
857 event, occurrence = res
858 organizers = (
859 session.execute(
860 select(EventOrganizer)
861 .where_users_column_visible(context, EventOrganizer.user_id)
862 .where(EventOrganizer.event_id == event.id)
863 .where(EventOrganizer.user_id >= next_user_id)
864 .order_by(EventOrganizer.user_id)
865 .limit(page_size + 1)
866 )
867 .scalars()
868 .all()
869 )
870 return events_pb2.ListEventOrganizersRes(
871 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
872 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
873 )
875 def TransferEvent(self, request, context, session):
876 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
877 if not res:
878 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
880 event, occurrence = res
882 if not _can_edit_event(session, event, context.user_id):
883 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED)
885 if occurrence.is_cancelled:
886 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
888 if occurrence.end_time < now() - timedelta(hours=24):
889 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
891 if request.WhichOneof("new_owner") == "new_owner_group_id":
892 cluster = session.execute(
893 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
894 ).scalar_one_or_none()
895 elif request.WhichOneof("new_owner") == "new_owner_community_id":
896 cluster = session.execute(
897 select(Cluster)
898 .where(Cluster.parent_node_id == request.new_owner_community_id)
899 .where(Cluster.is_official_cluster)
900 ).scalar_one_or_none()
902 if not cluster:
903 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
905 event.owner_user = None
906 event.owner_cluster = cluster
908 session.commit()
909 return event_to_pb(session, occurrence, context)
911 def SetEventSubscription(self, request, context, session):
912 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
913 if not res:
914 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
916 event, occurrence = res
918 if occurrence.is_cancelled:
919 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
921 if occurrence.end_time < now() - timedelta(hours=24):
922 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
924 current_subscription = session.execute(
925 select(EventSubscription)
926 .where(EventSubscription.user_id == context.user_id)
927 .where(EventSubscription.event_id == event.id)
928 ).scalar_one_or_none()
930 # if not subscribed, subscribe
931 if request.subscribe and not current_subscription:
932 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
934 # if subscribed but unsubbing, remove subscription
935 if not request.subscribe and current_subscription:
936 session.delete(current_subscription)
938 session.flush()
940 return event_to_pb(session, occurrence, context)
942 def SetEventAttendance(self, request, context, session):
943 occurrence = session.execute(
944 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
945 ).scalar_one_or_none()
947 if not occurrence:
948 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
950 if occurrence.is_cancelled:
951 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
953 if occurrence.end_time < now() - timedelta(hours=24):
954 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
956 current_attendance = session.execute(
957 select(EventOccurrenceAttendee)
958 .where(EventOccurrenceAttendee.user_id == context.user_id)
959 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
960 ).scalar_one_or_none()
962 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
963 if current_attendance:
964 session.delete(current_attendance)
965 # if unset/not going, nothing to do!
966 else:
967 if current_attendance:
968 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
969 else:
970 # create new
971 attendance = EventOccurrenceAttendee(
972 user_id=context.user_id,
973 occurrence_id=occurrence.id,
974 attendee_status=attendancestate2sql[request.attendance_state],
975 )
976 session.add(attendance)
978 session.flush()
980 return event_to_pb(session, occurrence, context)
982 def ListMyEvents(self, request, context, session):
983 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
984 # the page token is a unix timestamp of where we left off
985 page_token = (
986 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
987 )
988 # the page number is the page number we are on
989 page_number = request.page_number or 1
990 # Calculate the offset for pagination
991 offset = (page_number - 1) * page_size
992 occurrences = (
993 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
994 )
996 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
997 include_subscribed = request.subscribed or include_all
998 include_organizing = request.organizing or include_all
999 include_attending = request.attending or include_all
1000 include_my_communities = request.my_communities or include_all
1002 where_ = []
1004 if include_subscribed:
1005 occurrences = occurrences.outerjoin(
1006 EventSubscription,
1007 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
1008 )
1009 where_.append(EventSubscription.user_id != None)
1010 if include_organizing:
1011 occurrences = occurrences.outerjoin(
1012 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
1013 )
1014 where_.append(EventOrganizer.user_id != None)
1015 if include_attending:
1016 occurrences = occurrences.outerjoin(
1017 EventOccurrenceAttendee,
1018 and_(
1019 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
1020 EventOccurrenceAttendee.user_id == context.user_id,
1021 ),
1022 )
1023 where_.append(EventOccurrenceAttendee.user_id != None)
1024 if include_my_communities:
1025 my_communities = (
1026 session.execute(
1027 select(Node.id)
1028 .join(Cluster, Cluster.parent_node_id == Node.id)
1029 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
1030 .where(ClusterSubscription.user_id == context.user_id)
1031 .where(Cluster.is_official_cluster)
1032 .order_by(Node.id)
1033 .limit(100000)
1034 )
1035 .scalars()
1036 .all()
1037 )
1038 where_.append(Event.parent_node_id.in_(my_communities))
1040 occurrences = occurrences.where(or_(*where_))
1042 if not request.include_cancelled:
1043 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1045 if not request.past:
1046 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1047 EventOccurrence.start_time.asc()
1048 )
1049 else:
1050 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1051 EventOccurrence.start_time.desc()
1052 )
1053 # Count the total number of items for pagination
1054 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar()
1055 # Apply pagination by page number
1056 occurrences = (
1057 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1)
1058 )
1059 occurrences = session.execute(occurrences).scalars().all()
1061 return events_pb2.ListMyEventsRes(
1062 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1063 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1064 total_items=total_items,
1065 )
1067 def ListAllEvents(self, request, context, session):
1068 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1069 # the page token is a unix timestamp of where we left off
1070 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
1072 occurrences = (
1073 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1074 )
1076 if not request.include_cancelled:
1077 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1079 if not request.past:
1080 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1081 EventOccurrence.start_time.asc()
1082 )
1083 else:
1084 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1085 EventOccurrence.start_time.desc()
1086 )
1088 occurrences = occurrences.limit(page_size + 1)
1089 occurrences = session.execute(occurrences).scalars().all()
1091 return events_pb2.ListAllEventsRes(
1092 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1093 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1094 )
1096 def InviteEventOrganizer(self, request, context, session):
1097 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
1098 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1099 if not res:
1100 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1102 event, occurrence = res
1104 if not _can_edit_event(session, event, context.user_id):
1105 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
1107 if occurrence.is_cancelled:
1108 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1110 if occurrence.end_time < now() - timedelta(hours=24):
1111 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1113 if not session.execute(
1114 select(User).where_users_visible(context).where(User.id == request.user_id)
1115 ).scalar_one_or_none():
1116 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
1118 session.add(
1119 EventOrganizer(
1120 user_id=request.user_id,
1121 event=event,
1122 )
1123 )
1124 session.flush()
1126 other_user_context = make_user_context(user_id=request.user_id)
1128 notify(
1129 session,
1130 user_id=request.user_id,
1131 topic_action="event:invite_organizer",
1132 key=event.id,
1133 data=notification_data_pb2.EventInviteOrganizer(
1134 event=event_to_pb(session, occurrence, other_user_context),
1135 inviting_user=user_model_to_pb(user, session, other_user_context),
1136 ),
1137 )
1139 return empty_pb2.Empty()
1141 def RemoveEventOrganizer(self, request, context, session):
1142 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1143 if not res:
1144 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1146 event, occurrence = res
1148 if occurrence.is_cancelled:
1149 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1151 if occurrence.end_time < now() - timedelta(hours=24):
1152 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1154 if event.owner_user_id == context.user_id:
1155 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER)
1157 current = session.execute(
1158 select(EventOrganizer)
1159 .where(EventOrganizer.user_id == context.user_id)
1160 .where(EventOrganizer.event_id == event.id)
1161 ).scalar_one_or_none()
1163 if not current:
1164 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER)
1166 session.delete(current)
1168 return empty_pb2.Empty()