Coverage for src/couchers/servicers/events.py: 83%
494 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-12-20 18:03 +0000
1import logging
2from datetime import timedelta
3from types import SimpleNamespace
5import grpc
6from google.protobuf import empty_pb2
7from psycopg2.extras import DateTimeTZRange
8from sqlalchemy.sql import and_, func, or_, select, update
10from couchers import errors
11from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
12from couchers.jobs.enqueue import queue_job
13from couchers.models import (
14 AttendeeStatus,
15 Cluster,
16 ClusterSubscription,
17 Event,
18 EventCommunityInviteRequest,
19 EventOccurrence,
20 EventOccurrenceAttendee,
21 EventOrganizer,
22 EventSubscription,
23 Node,
24 Thread,
25 Upload,
26 User,
27)
28from couchers.notifications.notify import notify
29from couchers.servicers.api import user_model_to_pb
30from couchers.servicers.blocking import are_blocked
31from couchers.servicers.threads import thread_to_pb
32from couchers.sql import couchers_select as select
33from couchers.tasks import send_event_community_invite_request_email
34from couchers.utils import (
35 Timestamp_from_datetime,
36 create_coordinate,
37 dt_from_millis,
38 millis_from_dt,
39 now,
40 to_aware_datetime,
41)
42from proto import events_pb2, events_pb2_grpc, notification_data_pb2
43from proto.internal import jobs_pb2
45logger = logging.getLogger(__name__)
47attendancestate2sql = {
48 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
49 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
50 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
51}
53attendancestate2api = {
54 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
55 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
56 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
57}
59MAX_PAGINATION_LENGTH = 25
62def _is_event_owner(event: Event, user_id):
63 """
64 Checks whether the user can act as an owner of the event
65 """
66 if event.owner_user:
67 return event.owner_user_id == user_id
68 # otherwise owned by a cluster
69 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
72def _can_moderate_event(session, event: Event, user_id):
73 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
74 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
75 return True
77 # finally check if the user can moderate the parent node of the cluster
78 return can_moderate_node(session, user_id, event.parent_node_id)
81def _can_edit_event(session, event, user_id):
82 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id)
85def event_to_pb(session, occurrence: EventOccurrence, context):
86 event = occurrence.event
88 next_occurrence = (
89 event.occurrences.where(EventOccurrence.end_time >= now()).order_by(EventOccurrence.end_time.asc()).first()
90 )
92 owner_community_id = None
93 owner_group_id = None
94 if event.owner_cluster:
95 if event.owner_cluster.is_official_cluster:
96 owner_community_id = event.owner_cluster.parent_node_id
97 else:
98 owner_group_id = event.owner_cluster.id
100 attendance = occurrence.attendances.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
101 attendance_state = attendance.attendee_status if attendance else None
103 can_moderate = _can_moderate_event(session, event, context.user_id)
104 can_edit = _can_edit_event(session, event, context.user_id)
106 going_count = session.execute(
107 select(func.count())
108 .select_from(EventOccurrenceAttendee)
109 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
110 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
111 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
112 ).scalar_one()
113 maybe_count = session.execute(
114 select(func.count())
115 .select_from(EventOccurrenceAttendee)
116 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
117 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
118 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
119 ).scalar_one()
121 organizer_count = session.execute(
122 select(func.count())
123 .select_from(EventOrganizer)
124 .where_users_column_visible(context, EventOrganizer.user_id)
125 .where(EventOrganizer.event_id == event.id)
126 ).scalar_one()
127 subscriber_count = session.execute(
128 select(func.count())
129 .select_from(EventSubscription)
130 .where_users_column_visible(context, EventSubscription.user_id)
131 .where(EventSubscription.event_id == event.id)
132 ).scalar_one()
134 return events_pb2.Event(
135 event_id=occurrence.id,
136 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
137 is_cancelled=occurrence.is_cancelled,
138 is_deleted=occurrence.is_deleted,
139 title=event.title,
140 slug=event.slug,
141 content=occurrence.content,
142 photo_url=occurrence.photo.full_url if occurrence.photo else None,
143 online_information=(
144 events_pb2.OnlineEventInformation(
145 link=occurrence.link,
146 )
147 if occurrence.link
148 else None
149 ),
150 offline_information=(
151 events_pb2.OfflineEventInformation(
152 lat=occurrence.coordinates[0],
153 lng=occurrence.coordinates[1],
154 address=occurrence.address,
155 )
156 if occurrence.geom
157 else None
158 ),
159 created=Timestamp_from_datetime(occurrence.created),
160 last_edited=Timestamp_from_datetime(occurrence.last_edited),
161 creator_user_id=occurrence.creator_user_id,
162 start_time=Timestamp_from_datetime(occurrence.start_time),
163 end_time=Timestamp_from_datetime(occurrence.end_time),
164 timezone=occurrence.timezone,
165 start_time_display=str(occurrence.start_time),
166 end_time_display=str(occurrence.end_time),
167 attendance_state=attendancestate2api[attendance_state],
168 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
169 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
170 going_count=going_count,
171 maybe_count=maybe_count,
172 organizer_count=organizer_count,
173 subscriber_count=subscriber_count,
174 owner_user_id=event.owner_user_id,
175 owner_community_id=owner_community_id,
176 owner_group_id=owner_group_id,
177 thread=thread_to_pb(session, event.thread_id),
178 can_edit=can_edit,
179 can_moderate=can_moderate,
180 )
183def _get_event_and_occurrence_query(occurrence_id, include_deleted: bool):
184 query = (
185 select(Event, EventOccurrence)
186 .where(EventOccurrence.id == occurrence_id)
187 .where(EventOccurrence.event_id == Event.id)
188 )
190 if not include_deleted:
191 query = query.where(~EventOccurrence.is_deleted)
193 return query
196def _get_event_and_occurrence_one(
197 session, occurrence_id, include_deleted: bool = False
198) -> tuple[Event, EventOccurrence]:
199 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one()
202def _get_event_and_occurrence_one_or_none(
203 session, occurrence_id, include_deleted: bool = False
204) -> tuple[Event, EventOccurrence] | None:
205 return session.execute(_get_event_and_occurrence_query(occurrence_id, include_deleted)).one_or_none()
208def _check_occurrence_time_validity(start_time, end_time, context):
209 if start_time < now():
210 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST)
211 if end_time < start_time:
212 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS)
213 if end_time - start_time > timedelta(days=7):
214 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG)
215 if start_time - now() > timedelta(days=365):
216 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE)
219def get_users_to_notify_for_new_event(session, occurrence):
220 """
221 Returns the users to notify, as well as the community id that is being notified (None if based on geo search)
222 """
223 cluster = occurrence.event.parent_node.official_cluster
224 if cluster.parent_node_id == 1:
225 logger.info("The Global Community is too big for email notifications.")
226 return [], occurrence.event.parent_node_id
227 elif occurrence.creator_user in cluster.admins or cluster.is_leaf:
228 return list(cluster.members.where(User.is_visible)), occurrence.event.parent_node_id
229 else:
230 max_radius = 20000 # m
231 users = (
232 session.execute(
233 select(User)
234 .join(ClusterSubscription, ClusterSubscription.user_id == User.id)
235 .where(User.is_visible)
236 .where(ClusterSubscription.cluster_id == cluster.id)
237 .where(func.ST_DWithin(User.geom, occurrence.geom, max_radius / 111111))
238 )
239 .scalars()
240 .all()
241 )
242 return users, None
245def generate_event_create_notifications(payload: jobs_pb2.GenerateEventCreateNotificationsPayload):
246 """
247 Background job to generated/fan out event notifications
248 """
249 from couchers.servicers.communities import community_to_pb
251 logger.info(f"Fanning out notifications for event occurrence id = {payload.occurrence_id}")
253 with session_scope() as session:
254 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
255 creator = occurrence.creator_user
257 users, node_id = get_users_to_notify_for_new_event(session, occurrence)
259 inviting_user = session.execute(select(User).where(User.id == payload.inviting_user_id)).scalar_one_or_none()
261 if not inviting_user:
262 logger.error(f"Inviting user {payload.inviting_user_id} is gone while trying to send event notification?")
263 return
265 for user in users:
266 if are_blocked(session, user.id, creator.id):
267 continue
268 context = SimpleNamespace(user_id=user.id)
269 notify(
270 session,
271 user_id=user.id,
272 topic_action="event:create_approved" if payload.approved else "event:create_any",
273 key=payload.occurrence_id,
274 data=notification_data_pb2.EventCreate(
275 event=event_to_pb(session, occurrence, context),
276 inviting_user=user_model_to_pb(inviting_user, session, context),
277 nearby=True if node_id is None else None,
278 in_community=community_to_pb(session, event.parent_node, context) if node_id is not None else None,
279 ),
280 )
283def generate_event_update_notifications(payload: jobs_pb2.GenerateEventUpdateNotificationsPayload):
284 with session_scope() as session:
285 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
287 updating_user = session.execute(select(User).where(User.id == payload.updating_user_id)).scalar_one_or_none()
289 subscribed_user_ids = [user.id for user in event.subscribers]
290 attending_user_ids = [user.user_id for user in occurrence.attendances]
292 for user_id in set(subscribed_user_ids + attending_user_ids):
293 logger.info(user_id)
294 if are_blocked(session, user_id, updating_user.id):
295 continue
296 context = SimpleNamespace(user_id=user_id)
297 notify(
298 session,
299 user_id=user_id,
300 topic_action="event:update",
301 key=payload.occurrence_id,
302 data=notification_data_pb2.EventUpdate(
303 event=event_to_pb(session, occurrence, context),
304 updating_user=user_model_to_pb(updating_user, session, context),
305 updated_items=payload.updated_items,
306 ),
307 )
310def generate_event_cancel_notifications(payload: jobs_pb2.GenerateEventCancelNotificationsPayload):
311 with session_scope() as session:
312 event, occurrence = _get_event_and_occurrence_one(session, occurrence_id=payload.occurrence_id)
314 cancelling_user = session.execute(
315 select(User).where(User.id == payload.cancelling_user_id)
316 ).scalar_one_or_none()
318 subscribed_user_ids = [user.id for user in event.subscribers]
319 attending_user_ids = [user.user_id for user in occurrence.attendances]
321 for user_id in set(subscribed_user_ids + attending_user_ids):
322 logger.info(user_id)
323 if are_blocked(session, user_id, cancelling_user.id):
324 continue
325 context = SimpleNamespace(user_id=user_id)
326 notify(
327 session,
328 user_id=user_id,
329 topic_action="event:cancel",
330 key=payload.occurrence_id,
331 data=notification_data_pb2.EventCancel(
332 event=event_to_pb(session, occurrence, context),
333 cancelling_user=user_model_to_pb(cancelling_user, session, context),
334 ),
335 )
338def generate_event_delete_notifications(payload: jobs_pb2.GenerateEventDeleteNotificationsPayload):
339 with session_scope() as session:
340 event, occurrence = _get_event_and_occurrence_one(
341 session, occurrence_id=payload.occurrence_id, include_deleted=True
342 )
344 subscribed_user_ids = [user.id for user in event.subscribers]
345 attending_user_ids = [user.user_id for user in occurrence.attendances]
347 for user_id in set(subscribed_user_ids + attending_user_ids):
348 logger.info(user_id)
349 context = SimpleNamespace(user_id=user_id)
350 notify(
351 session,
352 user_id=user_id,
353 topic_action="event:delete",
354 key=payload.occurrence_id,
355 data=notification_data_pb2.EventDelete(
356 event=event_to_pb(session, occurrence, context),
357 ),
358 )
361class Events(events_pb2_grpc.EventsServicer):
362 def CreateEvent(self, request, context, session):
363 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
364 if not user.has_completed_profile:
365 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.INCOMPLETE_PROFILE_CREATE_EVENT)
366 if not request.title:
367 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE)
368 if not request.content:
369 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
370 if request.HasField("online_information"):
371 online = True
372 geom = None
373 address = None
374 if not request.online_information.link:
375 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
376 link = request.online_information.link
377 elif request.HasField("offline_information"):
378 online = False
379 # As protobuf parses a missing value as 0.0, this is not a permitted event coordinate value
380 if not (
381 request.offline_information.address
382 and request.offline_information.lat
383 and request.offline_information.lng
384 ):
385 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
386 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
387 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
388 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
389 address = request.offline_information.address
390 link = None
391 else:
392 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
394 start_time = to_aware_datetime(request.start_time)
395 end_time = to_aware_datetime(request.end_time)
397 _check_occurrence_time_validity(start_time, end_time, context)
399 if request.parent_community_id:
400 parent_node = session.execute(
401 select(Node).where(Node.id == request.parent_community_id)
402 ).scalar_one_or_none()
403 else:
404 if online:
405 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY)
406 # parent community computed from geom
407 parent_node = get_parent_node_at_location(session, geom)
409 if not parent_node:
410 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
412 if (
413 request.photo_key
414 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
415 ):
416 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
418 event = Event(
419 title=request.title,
420 parent_node_id=parent_node.id,
421 owner_user_id=context.user_id,
422 thread=Thread(),
423 creator_user_id=context.user_id,
424 )
425 session.add(event)
427 occurrence = EventOccurrence(
428 event=event,
429 content=request.content,
430 geom=geom,
431 address=address,
432 link=link,
433 photo_key=request.photo_key if request.photo_key != "" else None,
434 # timezone=timezone,
435 during=DateTimeTZRange(start_time, end_time),
436 creator_user_id=context.user_id,
437 )
438 session.add(occurrence)
440 session.add(
441 EventOrganizer(
442 user_id=context.user_id,
443 event=event,
444 )
445 )
447 session.add(
448 EventSubscription(
449 user_id=context.user_id,
450 event=event,
451 )
452 )
454 session.add(
455 EventOccurrenceAttendee(
456 user_id=context.user_id,
457 occurrence=occurrence,
458 attendee_status=AttendeeStatus.going,
459 )
460 )
462 session.commit()
464 if user.has_completed_profile:
465 queue_job(
466 session,
467 "generate_event_create_notifications",
468 payload=jobs_pb2.GenerateEventCreateNotificationsPayload(
469 inviting_user_id=user.id,
470 occurrence_id=occurrence.id,
471 approved=False,
472 ),
473 )
475 return event_to_pb(session, occurrence, context)
477 def ScheduleEvent(self, request, context, session):
478 if not request.content:
479 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
480 if request.HasField("online_information"):
481 geom = None
482 address = None
483 link = request.online_information.link
484 elif request.HasField("offline_information"):
485 if not (
486 request.offline_information.address
487 and request.offline_information.lat
488 and request.offline_information.lng
489 ):
490 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
491 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
492 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
493 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
494 address = request.offline_information.address
495 link = None
496 else:
497 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
499 start_time = to_aware_datetime(request.start_time)
500 end_time = to_aware_datetime(request.end_time)
502 _check_occurrence_time_validity(start_time, end_time, context)
504 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
505 if not res:
506 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
508 event, occurrence = res
510 if not _can_edit_event(session, event, context.user_id):
511 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
513 if occurrence.is_cancelled:
514 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
516 if (
517 request.photo_key
518 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
519 ):
520 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
522 during = DateTimeTZRange(start_time, end_time)
524 # && is the overlap operator for ranges
525 if (
526 session.execute(
527 select(EventOccurrence.id)
528 .where(EventOccurrence.event_id == event.id)
529 .where(EventOccurrence.during.op("&&")(during))
530 )
531 .scalars()
532 .first()
533 is not None
534 ):
535 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
537 occurrence = EventOccurrence(
538 event=event,
539 content=request.content,
540 geom=geom,
541 address=address,
542 link=link,
543 photo_key=request.photo_key if request.photo_key != "" else None,
544 # timezone=timezone,
545 during=during,
546 creator_user_id=context.user_id,
547 )
548 session.add(occurrence)
550 session.add(
551 EventOccurrenceAttendee(
552 user_id=context.user_id,
553 occurrence=occurrence,
554 attendee_status=AttendeeStatus.going,
555 )
556 )
558 session.flush()
560 # TODO: notify
562 return event_to_pb(session, occurrence, context)
564 def UpdateEvent(self, request, context, session):
565 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
566 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
567 if not res:
568 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
570 event, occurrence = res
572 if not _can_edit_event(session, event, context.user_id):
573 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
575 # the things that were updated and need to be notified about
576 notify_updated = []
578 if occurrence.is_cancelled:
579 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
581 occurrence_update = {"last_edited": now()}
583 if request.HasField("title"):
584 notify_updated.append("title")
585 event.title = request.title.value
586 event.last_edited = now()
588 if request.HasField("content"):
589 notify_updated.append("content")
590 occurrence_update["content"] = request.content.value
592 if request.HasField("photo_key"):
593 occurrence_update["photo_key"] = request.photo_key.value
595 if request.HasField("online_information"):
596 notify_updated.append("location")
597 if not request.online_information.link:
598 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
599 occurrence_update["link"] = request.online_information.link
600 occurrence_update["geom"] = None
601 occurrence_update["address"] = None
602 elif request.HasField("offline_information"):
603 notify_updated.append("location")
604 occurrence_update["link"] = None
605 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
606 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
607 occurrence_update["geom"] = create_coordinate(
608 request.offline_information.lat, request.offline_information.lng
609 )
610 occurrence_update["address"] = request.offline_information.address
612 if request.HasField("start_time") or request.HasField("end_time"):
613 if request.update_all_future:
614 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES)
615 if request.HasField("start_time"):
616 notify_updated.append("start time")
617 start_time = to_aware_datetime(request.start_time)
618 else:
619 start_time = occurrence.start_time
620 if request.HasField("end_time"):
621 notify_updated.append("end time")
622 end_time = to_aware_datetime(request.end_time)
623 else:
624 end_time = occurrence.end_time
626 _check_occurrence_time_validity(start_time, end_time, context)
628 during = DateTimeTZRange(start_time, end_time)
630 # && is the overlap operator for ranges
631 if (
632 session.execute(
633 select(EventOccurrence.id)
634 .where(EventOccurrence.event_id == event.id)
635 .where(EventOccurrence.id != occurrence.id)
636 .where(EventOccurrence.during.op("&&")(during))
637 )
638 .scalars()
639 .first()
640 is not None
641 ):
642 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
644 occurrence_update["during"] = during
646 # TODO
647 # if request.HasField("timezone"):
648 # occurrence_update["timezone"] = request.timezone
650 # allow editing any event which hasn't ended more than 24 hours before now
651 # when editing all future events, we edit all which have not yet ended
653 if request.update_all_future:
654 session.execute(
655 update(EventOccurrence)
656 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
657 .where(EventOccurrence.start_time >= occurrence.start_time)
658 .values(occurrence_update)
659 .execution_options(synchronize_session=False)
660 )
661 else:
662 if occurrence.end_time < now() - timedelta(hours=24):
663 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
664 session.execute(
665 update(EventOccurrence)
666 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
667 .where(EventOccurrence.id == occurrence.id)
668 .values(occurrence_update)
669 .execution_options(synchronize_session=False)
670 )
672 session.flush()
674 if notify_updated:
675 logger.info(f"Fields {','.join(notify_updated)} updated in event {event.id=}, notifying")
677 queue_job(
678 session,
679 "generate_event_update_notifications",
680 payload=jobs_pb2.GenerateEventUpdateNotificationsPayload(
681 updating_user_id=user.id,
682 occurrence_id=occurrence.id,
683 updated_items=notify_updated,
684 ),
685 )
687 # since we have synchronize_session=False, we have to refresh the object
688 session.refresh(occurrence)
690 return event_to_pb(session, occurrence, context)
692 def GetEvent(self, request, context, session):
693 occurrence = session.execute(
694 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
695 ).scalar_one_or_none()
697 if not occurrence:
698 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
700 return event_to_pb(session, occurrence, context)
702 def CancelEvent(self, request, context, session):
703 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
704 if not res:
705 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
707 event, occurrence = res
709 if not _can_edit_event(session, event, context.user_id):
710 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
712 if occurrence.end_time < now() - timedelta(hours=24):
713 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_CANCEL_OLD_EVENT)
715 occurrence.is_cancelled = True
717 queue_job(
718 session,
719 "generate_event_cancel_notifications",
720 payload=jobs_pb2.GenerateEventCancelNotificationsPayload(
721 cancelling_user_id=context.user_id,
722 occurrence_id=occurrence.id,
723 ),
724 )
726 return empty_pb2.Empty()
728 def RequestCommunityInvite(self, request, context, session):
729 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
730 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
731 if not res:
732 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
734 event, occurrence = res
736 if not _can_edit_event(session, event, context.user_id):
737 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
739 if occurrence.is_cancelled:
740 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
742 if occurrence.end_time < now() - timedelta(hours=24):
743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
745 this_user_reqs = [req for req in occurrence.community_invite_requests if req.user_id == context.user_id]
747 if len(this_user_reqs) > 0:
748 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_REQUESTED)
750 approved_reqs = [req for req in occurrence.community_invite_requests if req.approved]
752 if len(approved_reqs) > 0:
753 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_APPROVED)
755 request = EventCommunityInviteRequest(
756 occurrence_id=request.event_id,
757 user_id=context.user_id,
758 )
759 session.add(request)
760 session.flush()
762 send_event_community_invite_request_email(session, request)
764 return empty_pb2.Empty()
766 def ListEventOccurrences(self, request, context, session):
767 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
768 # the page token is a unix timestamp of where we left off
769 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
770 occurrence = session.execute(
771 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
772 ).scalar_one_or_none()
773 if not occurrence:
774 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
776 occurrences = (
777 select(EventOccurrence).where(EventOccurrence.event_id == Event.id).where(~EventOccurrence.is_deleted)
778 )
780 if not request.include_cancelled:
781 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
783 if not request.past:
784 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
785 EventOccurrence.start_time.asc()
786 )
787 else:
788 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
789 EventOccurrence.start_time.desc()
790 )
792 occurrences = occurrences.limit(page_size + 1)
793 occurrences = session.execute(occurrences).scalars().all()
795 return events_pb2.ListEventOccurrencesRes(
796 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
797 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
798 )
800 def ListEventAttendees(self, request, context, session):
801 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
802 next_user_id = int(request.page_token) if request.page_token else 0
803 occurrence = session.execute(
804 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
805 ).scalar_one_or_none()
806 if not occurrence:
807 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
808 attendees = (
809 session.execute(
810 select(EventOccurrenceAttendee)
811 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
812 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
813 .where(EventOccurrenceAttendee.user_id >= next_user_id)
814 .order_by(EventOccurrenceAttendee.user_id)
815 .limit(page_size + 1)
816 )
817 .scalars()
818 .all()
819 )
820 return events_pb2.ListEventAttendeesRes(
821 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
822 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
823 )
825 def ListEventSubscribers(self, request, context, session):
826 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
827 next_user_id = int(request.page_token) if request.page_token else 0
828 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
829 if not res:
830 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
831 event, occurrence = res
832 subscribers = (
833 session.execute(
834 select(EventSubscription)
835 .where_users_column_visible(context, EventSubscription.user_id)
836 .where(EventSubscription.event_id == event.id)
837 .where(EventSubscription.user_id >= next_user_id)
838 .order_by(EventSubscription.user_id)
839 .limit(page_size + 1)
840 )
841 .scalars()
842 .all()
843 )
844 return events_pb2.ListEventSubscribersRes(
845 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
846 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
847 )
849 def ListEventOrganizers(self, request, context, session):
850 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
851 next_user_id = int(request.page_token) if request.page_token else 0
852 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
853 if not res:
854 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
855 event, occurrence = res
856 organizers = (
857 session.execute(
858 select(EventOrganizer)
859 .where_users_column_visible(context, EventOrganizer.user_id)
860 .where(EventOrganizer.event_id == event.id)
861 .where(EventOrganizer.user_id >= next_user_id)
862 .order_by(EventOrganizer.user_id)
863 .limit(page_size + 1)
864 )
865 .scalars()
866 .all()
867 )
868 return events_pb2.ListEventOrganizersRes(
869 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
870 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
871 )
873 def TransferEvent(self, request, context, session):
874 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
875 if not res:
876 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
878 event, occurrence = res
880 if not _can_edit_event(session, event, context.user_id):
881 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED)
883 if occurrence.is_cancelled:
884 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
886 if occurrence.end_time < now() - timedelta(hours=24):
887 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
889 if request.WhichOneof("new_owner") == "new_owner_group_id":
890 cluster = session.execute(
891 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
892 ).scalar_one_or_none()
893 elif request.WhichOneof("new_owner") == "new_owner_community_id":
894 cluster = session.execute(
895 select(Cluster)
896 .where(Cluster.parent_node_id == request.new_owner_community_id)
897 .where(Cluster.is_official_cluster)
898 ).scalar_one_or_none()
900 if not cluster:
901 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
903 event.owner_user = None
904 event.owner_cluster = cluster
906 session.commit()
907 return event_to_pb(session, occurrence, context)
909 def SetEventSubscription(self, request, context, session):
910 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
911 if not res:
912 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
914 event, occurrence = res
916 if occurrence.is_cancelled:
917 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
919 if occurrence.end_time < now() - timedelta(hours=24):
920 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
922 current_subscription = session.execute(
923 select(EventSubscription)
924 .where(EventSubscription.user_id == context.user_id)
925 .where(EventSubscription.event_id == event.id)
926 ).scalar_one_or_none()
928 # if not subscribed, subscribe
929 if request.subscribe and not current_subscription:
930 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
932 # if subscribed but unsubbing, remove subscription
933 if not request.subscribe and current_subscription:
934 session.delete(current_subscription)
936 session.flush()
938 return event_to_pb(session, occurrence, context)
940 def SetEventAttendance(self, request, context, session):
941 occurrence = session.execute(
942 select(EventOccurrence).where(EventOccurrence.id == request.event_id).where(~EventOccurrence.is_deleted)
943 ).scalar_one_or_none()
945 if not occurrence:
946 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
948 if occurrence.is_cancelled:
949 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
951 if occurrence.end_time < now() - timedelta(hours=24):
952 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
954 current_attendance = session.execute(
955 select(EventOccurrenceAttendee)
956 .where(EventOccurrenceAttendee.user_id == context.user_id)
957 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
958 ).scalar_one_or_none()
960 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
961 if current_attendance:
962 session.delete(current_attendance)
963 # if unset/not going, nothing to do!
964 else:
965 if current_attendance:
966 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
967 else:
968 # create new
969 attendance = EventOccurrenceAttendee(
970 user_id=context.user_id,
971 occurrence_id=occurrence.id,
972 attendee_status=attendancestate2sql[request.attendance_state],
973 )
974 session.add(attendance)
976 session.flush()
978 return event_to_pb(session, occurrence, context)
980 def ListMyEvents(self, request, context, session):
981 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
982 # the page token is a unix timestamp of where we left off
983 page_token = (
984 dt_from_millis(int(request.page_token)) if request.page_token and not request.page_number else now()
985 )
986 # the page number is the page number we are on
987 page_number = request.page_number or 1
988 # Calculate the offset for pagination
989 offset = (page_number - 1) * page_size
990 occurrences = (
991 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
992 )
994 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
995 include_subscribed = request.subscribed or include_all
996 include_organizing = request.organizing or include_all
997 include_attending = request.attending or include_all
998 include_my_communities = request.my_communities or include_all
1000 where_ = []
1002 if include_subscribed:
1003 occurrences = occurrences.outerjoin(
1004 EventSubscription,
1005 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
1006 )
1007 where_.append(EventSubscription.user_id != None)
1008 if include_organizing:
1009 occurrences = occurrences.outerjoin(
1010 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
1011 )
1012 where_.append(EventOrganizer.user_id != None)
1013 if include_attending:
1014 occurrences = occurrences.outerjoin(
1015 EventOccurrenceAttendee,
1016 and_(
1017 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
1018 EventOccurrenceAttendee.user_id == context.user_id,
1019 ),
1020 )
1021 where_.append(EventOccurrenceAttendee.user_id != None)
1022 if include_my_communities:
1023 my_communities = (
1024 session.execute(
1025 select(Node.id)
1026 .join(Cluster, Cluster.parent_node_id == Node.id)
1027 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
1028 .where(ClusterSubscription.user_id == context.user_id)
1029 .where(Cluster.is_official_cluster)
1030 .order_by(Node.id)
1031 .limit(100000)
1032 )
1033 .scalars()
1034 .all()
1035 )
1036 where_.append(Event.parent_node_id.in_(my_communities))
1038 occurrences = occurrences.where(or_(*where_))
1040 if not request.include_cancelled:
1041 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1043 if not request.past:
1044 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1045 EventOccurrence.start_time.asc()
1046 )
1047 else:
1048 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1049 EventOccurrence.start_time.desc()
1050 )
1051 # Count the total number of items for pagination
1052 total_items = session.execute(select(func.count()).select_from(occurrences.subquery())).scalar()
1053 # Apply pagination by page number
1054 occurrences = (
1055 occurrences.offset(offset).limit(page_size) if request.page_number else occurrences.limit(page_size + 1)
1056 )
1057 occurrences = session.execute(occurrences).scalars().all()
1059 return events_pb2.ListMyEventsRes(
1060 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1061 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1062 total_items=total_items,
1063 )
1065 def ListAllEvents(self, request, context, session):
1066 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1067 # the page token is a unix timestamp of where we left off
1068 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
1070 occurrences = (
1071 select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id).where(~EventOccurrence.is_deleted)
1072 )
1074 if not request.include_cancelled:
1075 occurrences = occurrences.where(~EventOccurrence.is_cancelled)
1077 if not request.past:
1078 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
1079 EventOccurrence.start_time.asc()
1080 )
1081 else:
1082 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
1083 EventOccurrence.start_time.desc()
1084 )
1086 occurrences = occurrences.limit(page_size + 1)
1087 occurrences = session.execute(occurrences).scalars().all()
1089 return events_pb2.ListAllEventsRes(
1090 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
1091 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
1092 )
1094 def InviteEventOrganizer(self, request, context, session):
1095 user = session.execute(select(User).where(User.id == context.user_id)).scalar_one()
1096 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1097 if not res:
1098 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1100 event, occurrence = res
1102 if not _can_edit_event(session, event, context.user_id):
1103 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
1105 if occurrence.is_cancelled:
1106 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1108 if occurrence.end_time < now() - timedelta(hours=24):
1109 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1111 if not session.execute(
1112 select(User).where_users_visible(context).where(User.id == request.user_id)
1113 ).scalar_one_or_none():
1114 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND)
1116 session.add(
1117 EventOrganizer(
1118 user_id=request.user_id,
1119 event=event,
1120 )
1121 )
1122 session.flush()
1124 other_user_context = SimpleNamespace(user_id=request.user_id)
1126 notify(
1127 session,
1128 user_id=request.user_id,
1129 topic_action="event:invite_organizer",
1130 key=event.id,
1131 data=notification_data_pb2.EventInviteOrganizer(
1132 event=event_to_pb(session, occurrence, other_user_context),
1133 inviting_user=user_model_to_pb(user, session, other_user_context),
1134 ),
1135 )
1137 return empty_pb2.Empty()
1139 def RemoveEventOrganizer(self, request, context, session):
1140 res = _get_event_and_occurrence_one_or_none(session, occurrence_id=request.event_id)
1141 if not res:
1142 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
1144 event, occurrence = res
1146 if occurrence.is_cancelled:
1147 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_CANT_UPDATE_CANCELLED_EVENT)
1149 if occurrence.end_time < now() - timedelta(hours=24):
1150 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
1152 if event.owner_user_id == context.user_id:
1153 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER)
1155 current = session.execute(
1156 select(EventOrganizer)
1157 .where(EventOrganizer.user_id == context.user_id)
1158 .where(EventOrganizer.event_id == event.id)
1159 ).scalar_one_or_none()
1161 if not current:
1162 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER)
1164 session.delete(current)
1166 return empty_pb2.Empty()