Coverage for src/couchers/servicers/events.py: 91%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from datetime import timedelta
3import grpc
4from google.protobuf import empty_pb2
5from psycopg2.extras import DateTimeTZRange
6from sqlalchemy.sql import and_, func, or_, update
8from couchers import errors
9from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope
10from couchers.models import (
11 AttendeeStatus,
12 Cluster,
13 ClusterSubscription,
14 Event,
15 EventOccurrence,
16 EventOccurrenceAttendee,
17 EventOrganizer,
18 EventSubscription,
19 Node,
20 Thread,
21 Upload,
22 User,
23)
24from couchers.servicers.threads import thread_to_pb
25from couchers.sql import couchers_select as select
26from couchers.utils import (
27 Timestamp_from_datetime,
28 create_coordinate,
29 dt_from_millis,
30 millis_from_dt,
31 now,
32 to_aware_datetime,
33)
34from proto import events_pb2, events_pb2_grpc
36attendancestate2sql = {
37 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None,
38 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going,
39 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe,
40}
42attendancestate2api = {
43 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING,
44 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING,
45 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE,
46}
48MAX_PAGINATION_LENGTH = 25
51def _is_event_owner(event: Event, user_id):
52 """
53 Checks whether the user can act as an owner of the event
54 """
55 if event.owner_user:
56 return event.owner_user_id == user_id
57 # otherwise owned by a cluster
58 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None
61def _can_moderate_event(session, event: Event, user_id):
62 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event
63 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id):
64 return True
66 # finally check if the user can moderate the parent node of the cluster
67 return can_moderate_node(session, user_id, event.parent_node_id)
70def _can_edit_event(session, event, user_id):
71 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id)
74def event_to_pb(session, occurrence: EventOccurrence, context):
75 event = occurrence.event
77 next_occurrence = (
78 event.occurrences.where(EventOccurrence.end_time >= now()).order_by(EventOccurrence.end_time.asc()).first()
79 )
81 owner_community_id = None
82 owner_group_id = None
83 if event.owner_cluster:
84 if event.owner_cluster.is_official_cluster:
85 owner_community_id = event.owner_cluster.parent_node_id
86 else:
87 owner_group_id = event.owner_cluster.id
89 attendance = occurrence.attendees.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none()
90 attendance_state = attendance.attendee_status if attendance else None
92 can_moderate = _can_moderate_event(session, event, context.user_id)
94 going_count = session.execute(
95 select(func.count())
96 .select_from(EventOccurrenceAttendee)
97 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
98 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
99 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going)
100 ).scalar_one()
101 maybe_count = session.execute(
102 select(func.count())
103 .select_from(EventOccurrenceAttendee)
104 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
105 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
106 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe)
107 ).scalar_one()
109 organizer_count = session.execute(
110 select(func.count())
111 .select_from(EventOrganizer)
112 .where_users_column_visible(context, EventOrganizer.user_id)
113 .where(EventOrganizer.event_id == event.id)
114 ).scalar_one()
115 subscriber_count = session.execute(
116 select(func.count())
117 .select_from(EventSubscription)
118 .where_users_column_visible(context, EventSubscription.user_id)
119 .where(EventSubscription.event_id == event.id)
120 ).scalar_one()
122 return events_pb2.Event(
123 event_id=occurrence.id,
124 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id,
125 title=event.title,
126 slug=event.slug,
127 content=occurrence.content,
128 photo_url=occurrence.photo.full_url if occurrence.photo else None,
129 online_information=events_pb2.OnlineEventInformation(
130 link=occurrence.link,
131 )
132 if occurrence.link
133 else None,
134 offline_information=events_pb2.OfflineEventInformation(
135 lat=occurrence.coordinates[0],
136 lng=occurrence.coordinates[1],
137 address=occurrence.address,
138 )
139 if occurrence.geom
140 else None,
141 created=Timestamp_from_datetime(occurrence.created),
142 last_edited=Timestamp_from_datetime(occurrence.last_edited),
143 creator_user_id=occurrence.creator_user_id,
144 start_time=Timestamp_from_datetime(occurrence.start_time),
145 end_time=Timestamp_from_datetime(occurrence.end_time),
146 timezone=occurrence.timezone,
147 start_time_display=str(occurrence.start_time),
148 end_time_display=str(occurrence.end_time),
149 attendance_state=attendancestate2api[attendance_state],
150 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None,
151 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None,
152 going_count=going_count,
153 maybe_count=maybe_count,
154 organizer_count=organizer_count,
155 subscriber_count=subscriber_count,
156 owner_user_id=event.owner_user_id,
157 owner_community_id=owner_community_id,
158 owner_group_id=owner_group_id,
159 thread=thread_to_pb(event.thread_id),
160 can_edit=_is_event_owner(event, context.user_id),
161 can_moderate=can_moderate,
162 )
165def _check_occurrence_time_validity(start_time, end_time, context):
166 if start_time < now():
167 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST)
168 if end_time < start_time:
169 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS)
170 if end_time - start_time > timedelta(days=7):
171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG)
172 if start_time - now() > timedelta(days=365):
173 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE)
176class Events(events_pb2_grpc.EventsServicer):
177 def CreateEvent(self, request, context):
178 if not request.title:
179 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE)
180 if not request.content:
181 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
182 if request.HasField("online_information"):
183 online = True
184 geom = None
185 address = None
186 if not request.online_information.link:
187 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
188 link = request.online_information.link
189 elif request.HasField("offline_information"):
190 online = False
191 if not (
192 request.offline_information.address
193 and request.offline_information.lat
194 and request.offline_information.lng
195 ):
196 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
197 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
198 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
199 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
200 address = request.offline_information.address
201 link = None
202 else:
203 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
205 start_time = to_aware_datetime(request.start_time)
206 end_time = to_aware_datetime(request.end_time)
208 _check_occurrence_time_validity(start_time, end_time, context)
210 with session_scope() as session:
211 if request.parent_community_id:
212 parent_node = session.execute(
213 select(Node).where(Node.id == request.parent_community_id)
214 ).scalar_one_or_none()
215 else:
216 if online:
217 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY)
218 # parent community computed from geom
219 parent_node = get_parent_node_at_location(session, geom)
221 if not parent_node:
222 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND)
224 if (
225 request.photo_key
226 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
227 ):
228 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
230 event = Event(
231 title=request.title,
232 parent_node_id=parent_node.id,
233 owner_user_id=context.user_id,
234 thread=Thread(),
235 creator_user_id=context.user_id,
236 )
237 session.add(event)
239 occurrence = EventOccurrence(
240 event=event,
241 content=request.content,
242 geom=geom,
243 address=address,
244 link=link,
245 photo_key=request.photo_key if request.photo_key != "" else None,
246 # timezone=timezone,
247 during=DateTimeTZRange(start_time, end_time),
248 creator_user_id=context.user_id,
249 )
250 session.add(occurrence)
252 organizer = EventOrganizer(
253 user_id=context.user_id,
254 event=event,
255 )
256 session.add(organizer)
258 subscription = EventSubscription(
259 user_id=context.user_id,
260 event=event,
261 )
262 session.add(subscription)
264 attendee = EventOccurrenceAttendee(
265 user_id=context.user_id,
266 occurrence=occurrence,
267 attendee_status=AttendeeStatus.going,
268 )
269 session.add(attendee)
271 session.commit()
273 return event_to_pb(session, occurrence, context)
275 def ScheduleEvent(self, request, context):
276 if not request.content:
277 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT)
278 if request.HasField("online_information"):
279 online = True
280 geom = None
281 address = None
282 link = request.online_information.link
283 elif request.HasField("offline_information"):
284 online = False
285 if not (
286 request.offline_information.address
287 and request.offline_information.lat
288 and request.offline_information.lng
289 ):
290 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION)
291 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
292 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
293 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng)
294 address = request.offline_information.address
295 link = None
296 else:
297 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK)
299 start_time = to_aware_datetime(request.start_time)
300 end_time = to_aware_datetime(request.end_time)
302 _check_occurrence_time_validity(start_time, end_time, context)
304 with session_scope() as session:
305 res = session.execute(
306 select(Event, EventOccurrence)
307 .where(EventOccurrence.id == request.event_id)
308 .where(EventOccurrence.event_id == Event.id)
309 ).one_or_none()
311 if not res:
312 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
314 event, occurrence = res
316 if not _can_edit_event(session, event, context.user_id):
317 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
319 if (
320 request.photo_key
321 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none()
322 ):
323 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND)
325 during = DateTimeTZRange(start_time, end_time)
327 # && is the overlap operator for ranges
328 if (
329 session.execute(
330 select(EventOccurrence.id)
331 .where(EventOccurrence.event_id == event.id)
332 .where(EventOccurrence.during.op("&&")(during))
333 )
334 .scalars()
335 .first()
336 is not None
337 ):
338 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
340 occurrence = EventOccurrence(
341 event=event,
342 content=request.content,
343 geom=geom,
344 address=address,
345 link=link,
346 photo_key=request.photo_key if request.photo_key != "" else None,
347 # timezone=timezone,
348 during=during,
349 creator_user_id=context.user_id,
350 )
351 session.add(occurrence)
353 attendee = EventOccurrenceAttendee(
354 user_id=context.user_id,
355 occurrence=occurrence,
356 attendee_status=AttendeeStatus.going,
357 )
358 session.add(attendee)
360 session.flush()
362 # TODO: notify
364 return event_to_pb(session, occurrence, context)
366 def UpdateEvent(self, request, context):
367 with session_scope() as session:
368 res = session.execute(
369 select(Event, EventOccurrence)
370 .where(EventOccurrence.id == request.event_id)
371 .where(EventOccurrence.event_id == Event.id)
372 ).one_or_none()
374 if not res:
375 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
377 event, occurrence = res
379 if not _can_edit_event(session, event, context.user_id):
380 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
382 occurrence_update = {"last_edited": now()}
384 if request.HasField("title"):
385 event.title = request.title.value
386 event.last_edited = now()
388 if request.HasField("content"):
389 occurrence_update["content"] = request.content.value
391 if request.HasField("photo_key"):
392 occurrence_update["photo_key"] = request.photo_key.value
394 if request.HasField("online_information"):
395 if not request.online_information.link:
396 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK)
397 occurrence_update["link"] = request.online_information.link
398 occurrence_update["geom"] = None
399 occurrence_update["address"] = None
400 elif request.HasField("offline_information"):
401 occurrence_update["link"] = None
402 if request.offline_information.lat == 0 and request.offline_information.lng == 0:
403 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE)
404 occurrence_update["geom"] = create_coordinate(
405 request.offline_information.lat, request.offline_information.lng
406 )
407 occurrence_update["address"] = request.offline_information.address
409 if request.HasField("start_time") or request.HasField("end_time"):
410 if request.update_all_future:
411 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES)
412 if request.HasField("start_time"):
413 start_time = to_aware_datetime(request.start_time)
414 else:
415 start_time = occurrence.start_time
416 if request.HasField("end_time"):
417 end_time = to_aware_datetime(request.end_time)
418 else:
419 end_time = occurrence.end_time
421 _check_occurrence_time_validity(start_time, end_time, context)
423 during = DateTimeTZRange(start_time, end_time)
425 # && is the overlap operator for ranges
426 if (
427 session.execute(
428 select(EventOccurrence.id)
429 .where(EventOccurrence.event_id == event.id)
430 .where(EventOccurrence.id != occurrence.id)
431 .where(EventOccurrence.during.op("&&")(during))
432 )
433 .scalars()
434 .first()
435 is not None
436 ):
437 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP)
439 occurrence_update["during"] = during
441 # TODO
442 # if request.HasField("timezone"):
443 # occurrence_update["timezone"] = request.timezone
445 # allow editing any event which hasn't ended more than 24 hours before now
446 # when editing all future events, we edit all which have not yet ended
448 if request.update_all_future:
449 session.execute(
450 update(EventOccurrence)
451 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
452 .where(EventOccurrence.start_time >= occurrence.start_time)
453 .values(occurrence_update)
454 .execution_options(synchronize_session=False)
455 )
456 else:
457 if occurrence.end_time < now() - timedelta(hours=24):
458 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT)
459 session.execute(
460 update(EventOccurrence)
461 .where(EventOccurrence.end_time >= now() - timedelta(hours=24))
462 .where(EventOccurrence.id == occurrence.id)
463 .values(occurrence_update)
464 .execution_options(synchronize_session=False)
465 )
467 # TODO notify
469 session.flush()
471 # since we have synchronize_session=False, we have to refresh the object
472 session.refresh(occurrence)
474 return event_to_pb(session, occurrence, context)
476 def GetEvent(self, request, context):
477 with session_scope() as session:
478 occurrence = session.execute(
479 select(EventOccurrence).where(EventOccurrence.id == request.event_id)
480 ).scalar_one_or_none()
482 if not occurrence:
483 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
485 return event_to_pb(session, occurrence, context)
487 def ListEventOccurrences(self, request, context):
488 with session_scope() as session:
489 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
490 # the page token is a unix timestamp of where we left off
491 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
492 occurrence = session.execute(
493 select(EventOccurrence).where(EventOccurrence.id == request.event_id)
494 ).scalar_one_or_none()
495 if not occurrence:
496 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
498 occurrences = select(EventOccurrence).where(EventOccurrence.event_id == Event.id)
500 if not request.past:
501 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
502 EventOccurrence.start_time.asc()
503 )
504 else:
505 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
506 EventOccurrence.start_time.desc()
507 )
509 occurrences = occurrences.limit(page_size + 1)
510 occurrences = session.execute(occurrences).scalars().all()
512 return events_pb2.ListEventOccurrencesRes(
513 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
514 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
515 )
517 def ListEventAttendees(self, request, context):
518 with session_scope() as session:
519 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
520 next_user_id = int(request.page_token) if request.page_token else 0
521 occurrence = session.execute(
522 select(EventOccurrence).where(EventOccurrence.id == request.event_id)
523 ).scalar_one_or_none()
524 if not occurrence:
525 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
526 attendees = (
527 session.execute(
528 select(EventOccurrenceAttendee)
529 .where_users_column_visible(context, EventOccurrenceAttendee.user_id)
530 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
531 .where(EventOccurrenceAttendee.user_id >= next_user_id)
532 .order_by(EventOccurrenceAttendee.user_id)
533 .limit(page_size + 1)
534 )
535 .scalars()
536 .all()
537 )
538 return events_pb2.ListEventAttendeesRes(
539 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]],
540 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None,
541 )
543 def ListEventSubscribers(self, request, context):
544 with session_scope() as session:
545 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
546 next_user_id = int(request.page_token) if request.page_token else 0
547 res = session.execute(
548 select(Event, EventOccurrence)
549 .where(EventOccurrence.id == request.event_id)
550 .where(EventOccurrence.event_id == Event.id)
551 ).one_or_none()
552 if not res:
553 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
554 event, occurrence = res
555 subscribers = (
556 session.execute(
557 select(EventSubscription)
558 .where_users_column_visible(context, EventSubscription.user_id)
559 .where(EventSubscription.event_id == event.id)
560 .where(EventSubscription.user_id >= next_user_id)
561 .order_by(EventSubscription.user_id)
562 .limit(page_size + 1)
563 )
564 .scalars()
565 .all()
566 )
567 return events_pb2.ListEventSubscribersRes(
568 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]],
569 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None,
570 )
572 def ListEventOrganizers(self, request, context):
573 with session_scope() as session:
574 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
575 next_user_id = int(request.page_token) if request.page_token else 0
576 res = session.execute(
577 select(Event, EventOccurrence)
578 .where(EventOccurrence.id == request.event_id)
579 .where(EventOccurrence.event_id == Event.id)
580 ).one_or_none()
581 if not res:
582 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
583 event, occurrence = res
584 organizers = (
585 session.execute(
586 select(EventOrganizer)
587 .where_users_column_visible(context, EventOrganizer.user_id)
588 .where(EventOrganizer.event_id == event.id)
589 .where(EventOrganizer.user_id >= next_user_id)
590 .order_by(EventOrganizer.user_id)
591 .limit(page_size + 1)
592 )
593 .scalars()
594 .all()
595 )
596 return events_pb2.ListEventOrganizersRes(
597 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]],
598 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None,
599 )
601 def TransferEvent(self, request, context):
602 with session_scope() as session:
603 res = session.execute(
604 select(Event, EventOccurrence)
605 .where(EventOccurrence.id == request.event_id)
606 .where(EventOccurrence.event_id == Event.id)
607 ).one_or_none()
609 if not res:
610 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
612 event, occurrence = res
614 if not _can_edit_event(session, event, context.user_id):
615 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED)
617 if request.WhichOneof("new_owner") == "new_owner_group_id":
618 cluster = session.execute(
619 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id)
620 ).scalar_one_or_none()
621 elif request.WhichOneof("new_owner") == "new_owner_community_id":
622 cluster = session.execute(
623 select(Cluster)
624 .where(Cluster.parent_node_id == request.new_owner_community_id)
625 .where(Cluster.is_official_cluster)
626 ).scalar_one_or_none()
628 if not cluster:
629 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND)
631 event.owner_user = None
632 event.owner_cluster = cluster
634 session.commit()
635 return event_to_pb(session, occurrence, context)
637 def SetEventSubscription(self, request, context):
638 with session_scope() as session:
639 res = session.execute(
640 select(Event, EventOccurrence)
641 .where(EventOccurrence.id == request.event_id)
642 .where(EventOccurrence.event_id == Event.id)
643 ).one_or_none()
645 if not res:
646 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
648 event, occurrence = res
650 current_subscription = session.execute(
651 select(EventSubscription)
652 .where(EventSubscription.user_id == context.user_id)
653 .where(EventSubscription.event_id == event.id)
654 ).scalar_one_or_none()
656 # if not subscribed, subscribe
657 if request.subscribe and not current_subscription:
658 session.add(EventSubscription(user_id=context.user_id, event_id=event.id))
660 # if subscribed but unsubbing, remove subscription
661 if not request.subscribe and current_subscription:
662 session.delete(current_subscription)
664 session.flush()
666 return event_to_pb(session, occurrence, context)
668 def SetEventAttendance(self, request, context):
669 with session_scope() as session:
670 occurrence = session.execute(
671 select(EventOccurrence).where(EventOccurrence.id == request.event_id)
672 ).scalar_one_or_none()
674 if not occurrence:
675 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
677 current_attendance = session.execute(
678 select(EventOccurrenceAttendee)
679 .where(EventOccurrenceAttendee.user_id == context.user_id)
680 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id)
681 ).scalar_one_or_none()
683 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING:
684 if current_attendance:
685 session.delete(current_attendance)
686 # if unset/not going, nothing to do!
687 else:
688 if current_attendance:
689 current_attendance.attendee_status = attendancestate2sql[request.attendance_state]
690 else:
691 # create new
692 attendance = EventOccurrenceAttendee(
693 user_id=context.user_id,
694 occurrence_id=occurrence.id,
695 attendee_status=attendancestate2sql[request.attendance_state],
696 )
697 session.add(attendance)
699 session.flush()
701 return event_to_pb(session, occurrence, context)
703 def ListMyEvents(self, request, context):
704 with session_scope() as session:
705 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
706 # the page token is a unix timestamp of where we left off
707 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
709 occurrences = select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id)
711 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities)
712 include_subscribed = request.subscribed or include_all
713 include_organizing = request.organizing or include_all
714 include_attending = request.attending or include_all
715 include_my_communities = request.my_communities or include_all
717 where_ = []
719 if include_subscribed:
720 occurrences = occurrences.outerjoin(
721 EventSubscription,
722 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id),
723 )
724 where_.append(EventSubscription.user_id != None)
725 if include_organizing:
726 occurrences = occurrences.outerjoin(
727 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id)
728 )
729 where_.append(EventOrganizer.user_id != None)
730 if include_attending:
731 occurrences = occurrences.outerjoin(
732 EventOccurrenceAttendee,
733 and_(
734 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id,
735 EventOccurrenceAttendee.user_id == context.user_id,
736 ),
737 )
738 where_.append(EventOccurrenceAttendee.user_id != None)
739 if include_my_communities:
740 my_communities = (
741 session.execute(
742 select(Node.id)
743 .join(Cluster, Cluster.parent_node_id == Node.id)
744 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id)
745 .where(ClusterSubscription.user_id == context.user_id)
746 .where(Cluster.is_official_cluster)
747 .order_by(Node.id)
748 .limit(100000)
749 )
750 .scalars()
751 .all()
752 )
753 where_.append(Event.parent_node_id.in_(my_communities))
755 occurrences = occurrences.where(or_(*where_))
757 if not request.past:
758 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
759 EventOccurrence.start_time.asc()
760 )
761 else:
762 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
763 EventOccurrence.start_time.desc()
764 )
766 occurrences = occurrences.limit(page_size + 1)
767 occurrences = session.execute(occurrences).scalars().all()
769 return events_pb2.ListMyEventsRes(
770 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
771 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
772 )
774 def ListAllEvents(self, request, context):
775 with session_scope() as session:
776 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
777 # the page token is a unix timestamp of where we left off
778 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now()
780 occurrences = select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id)
782 if not request.past:
783 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by(
784 EventOccurrence.start_time.asc()
785 )
786 else:
787 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by(
788 EventOccurrence.start_time.desc()
789 )
791 occurrences = occurrences.limit(page_size + 1)
792 occurrences = session.execute(occurrences).scalars().all()
794 return events_pb2.ListAllEventsRes(
795 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]],
796 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None,
797 )
799 def InviteEventOrganizer(self, request, context):
800 with session_scope() as session:
801 res = session.execute(
802 select(Event, EventOccurrence)
803 .where(EventOccurrence.id == request.event_id)
804 .where(EventOccurrence.event_id == Event.id)
805 ).one_or_none()
807 if not res:
808 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
810 event, occurrence = res
812 if not _can_edit_event(session, event, context.user_id):
813 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED)
815 if not session.execute(
816 select(User).where_users_visible(context).where(User.id == request.user_id)
817 ).scalar_one_or_none():
818 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.USER_NOT_FOUND)
820 organizer = EventOrganizer(
821 user_id=request.user_id,
822 event=event,
823 )
824 session.add(organizer)
825 session.flush()
827 # TODO: notify
829 return empty_pb2.Empty()
831 def RemoveEventOrganizer(self, request, context):
832 with session_scope() as session:
833 res = session.execute(
834 select(Event, EventOccurrence)
835 .where(EventOccurrence.id == request.event_id)
836 .where(EventOccurrence.event_id == Event.id)
837 ).one_or_none()
839 if not res:
840 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND)
842 event, occurrence = res
844 if event.owner_user_id == context.user_id:
845 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER)
847 current = session.execute(
848 select(EventOrganizer)
849 .where(EventOrganizer.user_id == context.user_id)
850 .where(EventOrganizer.event_id == event.id)
851 ).scalar_one_or_none()
853 if not current:
854 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER)
856 session.delete(current)
858 return empty_pb2.Empty()