Coverage for src/couchers/servicers/events.py: 91%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

351 statements  

1from datetime import timedelta 

2 

3import grpc 

4from google.protobuf import empty_pb2 

5from psycopg2.extras import DateTimeTZRange 

6from sqlalchemy.sql import and_, func, or_, update 

7 

8from couchers import errors 

9from couchers.db import can_moderate_node, get_parent_node_at_location, session_scope 

10from couchers.models import ( 

11 AttendeeStatus, 

12 Cluster, 

13 ClusterSubscription, 

14 Event, 

15 EventOccurrence, 

16 EventOccurrenceAttendee, 

17 EventOrganizer, 

18 EventSubscription, 

19 Node, 

20 Thread, 

21 Upload, 

22 User, 

23) 

24from couchers.servicers.threads import thread_to_pb 

25from couchers.sql import couchers_select as select 

26from couchers.utils import ( 

27 Timestamp_from_datetime, 

28 create_coordinate, 

29 dt_from_millis, 

30 millis_from_dt, 

31 now, 

32 to_aware_datetime, 

33) 

34from proto import events_pb2, events_pb2_grpc 

35 

36attendancestate2sql = { 

37 events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING: None, 

38 events_pb2.AttendanceState.ATTENDANCE_STATE_GOING: AttendeeStatus.going, 

39 events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE: AttendeeStatus.maybe, 

40} 

41 

42attendancestate2api = { 

43 None: events_pb2.AttendanceState.ATTENDANCE_STATE_NOT_GOING, 

44 AttendeeStatus.going: events_pb2.AttendanceState.ATTENDANCE_STATE_GOING, 

45 AttendeeStatus.maybe: events_pb2.AttendanceState.ATTENDANCE_STATE_MAYBE, 

46} 

47 

48MAX_PAGINATION_LENGTH = 25 

49 

50 

51def _is_event_owner(event: Event, user_id): 

52 """ 

53 Checks whether the user can act as an owner of the event 

54 """ 

55 if event.owner_user: 

56 return event.owner_user_id == user_id 

57 # otherwise owned by a cluster 

58 return event.owner_cluster.admins.where(User.id == user_id).one_or_none() is not None 

59 

60 

61def _can_moderate_event(session, event: Event, user_id): 

62 # if the event is owned by a cluster, then any moderator of that cluster can moderate this event 

63 if event.owner_cluster is not None and can_moderate_node(session, user_id, event.owner_cluster.parent_node_id): 

64 return True 

65 

66 # finally check if the user can moderate the parent node of the cluster 

67 return can_moderate_node(session, user_id, event.parent_node_id) 

68 

69 

70def _can_edit_event(session, event, user_id): 

71 return _is_event_owner(event, user_id) or _can_moderate_event(session, event, user_id) 

72 

73 

74def event_to_pb(session, occurrence: EventOccurrence, context): 

75 event = occurrence.event 

76 

77 next_occurrence = ( 

78 event.occurrences.where(EventOccurrence.end_time >= now()).order_by(EventOccurrence.end_time.asc()).first() 

79 ) 

80 

81 owner_community_id = None 

82 owner_group_id = None 

83 if event.owner_cluster: 

84 if event.owner_cluster.is_official_cluster: 

85 owner_community_id = event.owner_cluster.parent_node_id 

86 else: 

87 owner_group_id = event.owner_cluster.id 

88 

89 attendance = occurrence.attendees.where(EventOccurrenceAttendee.user_id == context.user_id).one_or_none() 

90 attendance_state = attendance.attendee_status if attendance else None 

91 

92 can_moderate = _can_moderate_event(session, event, context.user_id) 

93 

94 going_count = session.execute( 

95 select(func.count()) 

96 .select_from(EventOccurrenceAttendee) 

97 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

98 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

99 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.going) 

100 ).scalar_one() 

101 maybe_count = session.execute( 

102 select(func.count()) 

103 .select_from(EventOccurrenceAttendee) 

104 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

105 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

106 .where(EventOccurrenceAttendee.attendee_status == AttendeeStatus.maybe) 

107 ).scalar_one() 

108 

109 organizer_count = session.execute( 

110 select(func.count()) 

111 .select_from(EventOrganizer) 

112 .where_users_column_visible(context, EventOrganizer.user_id) 

113 .where(EventOrganizer.event_id == event.id) 

114 ).scalar_one() 

115 subscriber_count = session.execute( 

116 select(func.count()) 

117 .select_from(EventSubscription) 

118 .where_users_column_visible(context, EventSubscription.user_id) 

119 .where(EventSubscription.event_id == event.id) 

120 ).scalar_one() 

121 

122 return events_pb2.Event( 

123 event_id=occurrence.id, 

124 is_next=False if not next_occurrence else occurrence.id == next_occurrence.id, 

125 title=event.title, 

126 slug=event.slug, 

127 content=occurrence.content, 

128 photo_url=occurrence.photo.full_url if occurrence.photo else None, 

129 online_information=events_pb2.OnlineEventInformation( 

130 link=occurrence.link, 

131 ) 

132 if occurrence.link 

133 else None, 

134 offline_information=events_pb2.OfflineEventInformation( 

135 lat=occurrence.coordinates[0], 

136 lng=occurrence.coordinates[1], 

137 address=occurrence.address, 

138 ) 

139 if occurrence.geom 

140 else None, 

141 created=Timestamp_from_datetime(occurrence.created), 

142 last_edited=Timestamp_from_datetime(occurrence.last_edited), 

143 creator_user_id=occurrence.creator_user_id, 

144 start_time=Timestamp_from_datetime(occurrence.start_time), 

145 end_time=Timestamp_from_datetime(occurrence.end_time), 

146 timezone=occurrence.timezone, 

147 start_time_display=str(occurrence.start_time), 

148 end_time_display=str(occurrence.end_time), 

149 attendance_state=attendancestate2api[attendance_state], 

150 organizer=event.organizers.where(EventOrganizer.user_id == context.user_id).one_or_none() is not None, 

151 subscriber=event.subscribers.where(EventSubscription.user_id == context.user_id).one_or_none() is not None, 

152 going_count=going_count, 

153 maybe_count=maybe_count, 

154 organizer_count=organizer_count, 

155 subscriber_count=subscriber_count, 

156 owner_user_id=event.owner_user_id, 

157 owner_community_id=owner_community_id, 

158 owner_group_id=owner_group_id, 

159 thread=thread_to_pb(event.thread_id), 

160 can_edit=_is_event_owner(event, context.user_id), 

161 can_moderate=can_moderate, 

162 ) 

163 

164 

165def _check_occurrence_time_validity(start_time, end_time, context): 

166 if start_time < now(): 

167 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_IN_PAST) 

168 if end_time < start_time: 

169 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_ENDS_BEFORE_STARTS) 

170 if end_time - start_time > timedelta(days=7): 

171 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_LONG) 

172 if start_time - now() > timedelta(days=365): 

173 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_TOO_FAR_IN_FUTURE) 

174 

175 

176class Events(events_pb2_grpc.EventsServicer): 

177 def CreateEvent(self, request, context): 

178 if not request.title: 

179 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_TITLE) 

180 if not request.content: 

181 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

182 if request.HasField("online_information"): 

183 online = True 

184 geom = None 

185 address = None 

186 if not request.online_information.link: 

187 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

188 link = request.online_information.link 

189 elif request.HasField("offline_information"): 

190 online = False 

191 if not ( 

192 request.offline_information.address 

193 and request.offline_information.lat 

194 and request.offline_information.lng 

195 ): 

196 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

197 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

198 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

199 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

200 address = request.offline_information.address 

201 link = None 

202 else: 

203 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

204 

205 start_time = to_aware_datetime(request.start_time) 

206 end_time = to_aware_datetime(request.end_time) 

207 

208 _check_occurrence_time_validity(start_time, end_time, context) 

209 

210 with session_scope() as session: 

211 if request.parent_community_id: 

212 parent_node = session.execute( 

213 select(Node).where(Node.id == request.parent_community_id) 

214 ).scalar_one_or_none() 

215 else: 

216 if online: 

217 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_MISSING_PARENT_COMMUNITY) 

218 # parent community computed from geom 

219 parent_node = get_parent_node_at_location(session, geom) 

220 

221 if not parent_node: 

222 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.COMMUNITY_NOT_FOUND) 

223 

224 if ( 

225 request.photo_key 

226 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

227 ): 

228 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

229 

230 event = Event( 

231 title=request.title, 

232 parent_node_id=parent_node.id, 

233 owner_user_id=context.user_id, 

234 thread=Thread(), 

235 creator_user_id=context.user_id, 

236 ) 

237 session.add(event) 

238 

239 occurrence = EventOccurrence( 

240 event=event, 

241 content=request.content, 

242 geom=geom, 

243 address=address, 

244 link=link, 

245 photo_key=request.photo_key if request.photo_key != "" else None, 

246 # timezone=timezone, 

247 during=DateTimeTZRange(start_time, end_time), 

248 creator_user_id=context.user_id, 

249 ) 

250 session.add(occurrence) 

251 

252 organizer = EventOrganizer( 

253 user_id=context.user_id, 

254 event=event, 

255 ) 

256 session.add(organizer) 

257 

258 subscription = EventSubscription( 

259 user_id=context.user_id, 

260 event=event, 

261 ) 

262 session.add(subscription) 

263 

264 attendee = EventOccurrenceAttendee( 

265 user_id=context.user_id, 

266 occurrence=occurrence, 

267 attendee_status=AttendeeStatus.going, 

268 ) 

269 session.add(attendee) 

270 

271 session.commit() 

272 

273 return event_to_pb(session, occurrence, context) 

274 

275 def ScheduleEvent(self, request, context): 

276 if not request.content: 

277 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_CONTENT) 

278 if request.HasField("online_information"): 

279 online = True 

280 geom = None 

281 address = None 

282 link = request.online_information.link 

283 elif request.HasField("offline_information"): 

284 online = False 

285 if not ( 

286 request.offline_information.address 

287 and request.offline_information.lat 

288 and request.offline_information.lng 

289 ): 

290 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_OR_LOCATION) 

291 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

292 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

293 geom = create_coordinate(request.offline_information.lat, request.offline_information.lng) 

294 address = request.offline_information.address 

295 link = None 

296 else: 

297 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_EVENT_ADDRESS_LOCATION_OR_LINK) 

298 

299 start_time = to_aware_datetime(request.start_time) 

300 end_time = to_aware_datetime(request.end_time) 

301 

302 _check_occurrence_time_validity(start_time, end_time, context) 

303 

304 with session_scope() as session: 

305 res = session.execute( 

306 select(Event, EventOccurrence) 

307 .where(EventOccurrence.id == request.event_id) 

308 .where(EventOccurrence.event_id == Event.id) 

309 ).one_or_none() 

310 

311 if not res: 

312 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

313 

314 event, occurrence = res 

315 

316 if not _can_edit_event(session, event, context.user_id): 

317 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

318 

319 if ( 

320 request.photo_key 

321 and not session.execute(select(Upload).where(Upload.key == request.photo_key)).scalar_one_or_none() 

322 ): 

323 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.PHOTO_NOT_FOUND) 

324 

325 during = DateTimeTZRange(start_time, end_time) 

326 

327 # && is the overlap operator for ranges 

328 if ( 

329 session.execute( 

330 select(EventOccurrence.id) 

331 .where(EventOccurrence.event_id == event.id) 

332 .where(EventOccurrence.during.op("&&")(during)) 

333 ) 

334 .scalars() 

335 .first() 

336 is not None 

337 ): 

338 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

339 

340 occurrence = EventOccurrence( 

341 event=event, 

342 content=request.content, 

343 geom=geom, 

344 address=address, 

345 link=link, 

346 photo_key=request.photo_key if request.photo_key != "" else None, 

347 # timezone=timezone, 

348 during=during, 

349 creator_user_id=context.user_id, 

350 ) 

351 session.add(occurrence) 

352 

353 attendee = EventOccurrenceAttendee( 

354 user_id=context.user_id, 

355 occurrence=occurrence, 

356 attendee_status=AttendeeStatus.going, 

357 ) 

358 session.add(attendee) 

359 

360 session.flush() 

361 

362 # TODO: notify 

363 

364 return event_to_pb(session, occurrence, context) 

365 

366 def UpdateEvent(self, request, context): 

367 with session_scope() as session: 

368 res = session.execute( 

369 select(Event, EventOccurrence) 

370 .where(EventOccurrence.id == request.event_id) 

371 .where(EventOccurrence.event_id == Event.id) 

372 ).one_or_none() 

373 

374 if not res: 

375 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

376 

377 event, occurrence = res 

378 

379 if not _can_edit_event(session, event, context.user_id): 

380 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

381 

382 occurrence_update = {"last_edited": now()} 

383 

384 if request.HasField("title"): 

385 event.title = request.title.value 

386 event.last_edited = now() 

387 

388 if request.HasField("content"): 

389 occurrence_update["content"] = request.content.value 

390 

391 if request.HasField("photo_key"): 

392 occurrence_update["photo_key"] = request.photo_key.value 

393 

394 if request.HasField("online_information"): 

395 if not request.online_information.link: 

396 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ONLINE_EVENT_REQUIRES_LINK) 

397 occurrence_update["link"] = request.online_information.link 

398 occurrence_update["geom"] = None 

399 occurrence_update["address"] = None 

400 elif request.HasField("offline_information"): 

401 occurrence_update["link"] = None 

402 if request.offline_information.lat == 0 and request.offline_information.lng == 0: 

403 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.INVALID_COORDINATE) 

404 occurrence_update["geom"] = create_coordinate( 

405 request.offline_information.lat, request.offline_information.lng 

406 ) 

407 occurrence_update["address"] = request.offline_information.address 

408 

409 if request.HasField("start_time") or request.HasField("end_time"): 

410 if request.update_all_future: 

411 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.EVENT_CANT_UPDATE_ALL_TIMES) 

412 if request.HasField("start_time"): 

413 start_time = to_aware_datetime(request.start_time) 

414 else: 

415 start_time = occurrence.start_time 

416 if request.HasField("end_time"): 

417 end_time = to_aware_datetime(request.end_time) 

418 else: 

419 end_time = occurrence.end_time 

420 

421 _check_occurrence_time_validity(start_time, end_time, context) 

422 

423 during = DateTimeTZRange(start_time, end_time) 

424 

425 # && is the overlap operator for ranges 

426 if ( 

427 session.execute( 

428 select(EventOccurrence.id) 

429 .where(EventOccurrence.event_id == event.id) 

430 .where(EventOccurrence.id != occurrence.id) 

431 .where(EventOccurrence.during.op("&&")(during)) 

432 ) 

433 .scalars() 

434 .first() 

435 is not None 

436 ): 

437 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_OVERLAP) 

438 

439 occurrence_update["during"] = during 

440 

441 # TODO 

442 # if request.HasField("timezone"): 

443 # occurrence_update["timezone"] = request.timezone 

444 

445 # allow editing any event which hasn't ended more than 24 hours before now 

446 # when editing all future events, we edit all which have not yet ended 

447 

448 if request.update_all_future: 

449 session.execute( 

450 update(EventOccurrence) 

451 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

452 .where(EventOccurrence.start_time >= occurrence.start_time) 

453 .values(occurrence_update) 

454 .execution_options(synchronize_session=False) 

455 ) 

456 else: 

457 if occurrence.end_time < now() - timedelta(hours=24): 

458 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_UPDATE_OLD_EVENT) 

459 session.execute( 

460 update(EventOccurrence) 

461 .where(EventOccurrence.end_time >= now() - timedelta(hours=24)) 

462 .where(EventOccurrence.id == occurrence.id) 

463 .values(occurrence_update) 

464 .execution_options(synchronize_session=False) 

465 ) 

466 

467 # TODO notify 

468 

469 session.flush() 

470 

471 # since we have synchronize_session=False, we have to refresh the object 

472 session.refresh(occurrence) 

473 

474 return event_to_pb(session, occurrence, context) 

475 

476 def GetEvent(self, request, context): 

477 with session_scope() as session: 

478 occurrence = session.execute( 

479 select(EventOccurrence).where(EventOccurrence.id == request.event_id) 

480 ).scalar_one_or_none() 

481 

482 if not occurrence: 

483 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

484 

485 return event_to_pb(session, occurrence, context) 

486 

487 def ListEventOccurrences(self, request, context): 

488 with session_scope() as session: 

489 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

490 # the page token is a unix timestamp of where we left off 

491 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

492 occurrence = session.execute( 

493 select(EventOccurrence).where(EventOccurrence.id == request.event_id) 

494 ).scalar_one_or_none() 

495 if not occurrence: 

496 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

497 

498 occurrences = select(EventOccurrence).where(EventOccurrence.event_id == Event.id) 

499 

500 if not request.past: 

501 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

502 EventOccurrence.start_time.asc() 

503 ) 

504 else: 

505 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

506 EventOccurrence.start_time.desc() 

507 ) 

508 

509 occurrences = occurrences.limit(page_size + 1) 

510 occurrences = session.execute(occurrences).scalars().all() 

511 

512 return events_pb2.ListEventOccurrencesRes( 

513 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

514 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

515 ) 

516 

517 def ListEventAttendees(self, request, context): 

518 with session_scope() as session: 

519 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

520 next_user_id = int(request.page_token) if request.page_token else 0 

521 occurrence = session.execute( 

522 select(EventOccurrence).where(EventOccurrence.id == request.event_id) 

523 ).scalar_one_or_none() 

524 if not occurrence: 

525 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

526 attendees = ( 

527 session.execute( 

528 select(EventOccurrenceAttendee) 

529 .where_users_column_visible(context, EventOccurrenceAttendee.user_id) 

530 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

531 .where(EventOccurrenceAttendee.user_id >= next_user_id) 

532 .order_by(EventOccurrenceAttendee.user_id) 

533 .limit(page_size + 1) 

534 ) 

535 .scalars() 

536 .all() 

537 ) 

538 return events_pb2.ListEventAttendeesRes( 

539 attendee_user_ids=[attendee.user_id for attendee in attendees[:page_size]], 

540 next_page_token=str(attendees[-1].user_id) if len(attendees) > page_size else None, 

541 ) 

542 

543 def ListEventSubscribers(self, request, context): 

544 with session_scope() as session: 

545 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

546 next_user_id = int(request.page_token) if request.page_token else 0 

547 res = session.execute( 

548 select(Event, EventOccurrence) 

549 .where(EventOccurrence.id == request.event_id) 

550 .where(EventOccurrence.event_id == Event.id) 

551 ).one_or_none() 

552 if not res: 

553 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

554 event, occurrence = res 

555 subscribers = ( 

556 session.execute( 

557 select(EventSubscription) 

558 .where_users_column_visible(context, EventSubscription.user_id) 

559 .where(EventSubscription.event_id == event.id) 

560 .where(EventSubscription.user_id >= next_user_id) 

561 .order_by(EventSubscription.user_id) 

562 .limit(page_size + 1) 

563 ) 

564 .scalars() 

565 .all() 

566 ) 

567 return events_pb2.ListEventSubscribersRes( 

568 subscriber_user_ids=[subscriber.user_id for subscriber in subscribers[:page_size]], 

569 next_page_token=str(subscribers[-1].user_id) if len(subscribers) > page_size else None, 

570 ) 

571 

572 def ListEventOrganizers(self, request, context): 

573 with session_scope() as session: 

574 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

575 next_user_id = int(request.page_token) if request.page_token else 0 

576 res = session.execute( 

577 select(Event, EventOccurrence) 

578 .where(EventOccurrence.id == request.event_id) 

579 .where(EventOccurrence.event_id == Event.id) 

580 ).one_or_none() 

581 if not res: 

582 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

583 event, occurrence = res 

584 organizers = ( 

585 session.execute( 

586 select(EventOrganizer) 

587 .where_users_column_visible(context, EventOrganizer.user_id) 

588 .where(EventOrganizer.event_id == event.id) 

589 .where(EventOrganizer.user_id >= next_user_id) 

590 .order_by(EventOrganizer.user_id) 

591 .limit(page_size + 1) 

592 ) 

593 .scalars() 

594 .all() 

595 ) 

596 return events_pb2.ListEventOrganizersRes( 

597 organizer_user_ids=[organizer.user_id for organizer in organizers[:page_size]], 

598 next_page_token=str(organizers[-1].user_id) if len(organizers) > page_size else None, 

599 ) 

600 

601 def TransferEvent(self, request, context): 

602 with session_scope() as session: 

603 res = session.execute( 

604 select(Event, EventOccurrence) 

605 .where(EventOccurrence.id == request.event_id) 

606 .where(EventOccurrence.event_id == Event.id) 

607 ).one_or_none() 

608 

609 if not res: 

610 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

611 

612 event, occurrence = res 

613 

614 if not _can_edit_event(session, event, context.user_id): 

615 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_TRANSFER_PERMISSION_DENIED) 

616 

617 if request.WhichOneof("new_owner") == "new_owner_group_id": 

618 cluster = session.execute( 

619 select(Cluster).where(~Cluster.is_official_cluster).where(Cluster.id == request.new_owner_group_id) 

620 ).scalar_one_or_none() 

621 elif request.WhichOneof("new_owner") == "new_owner_community_id": 

622 cluster = session.execute( 

623 select(Cluster) 

624 .where(Cluster.parent_node_id == request.new_owner_community_id) 

625 .where(Cluster.is_official_cluster) 

626 ).scalar_one_or_none() 

627 

628 if not cluster: 

629 context.abort(grpc.StatusCode.NOT_FOUND, errors.GROUP_OR_COMMUNITY_NOT_FOUND) 

630 

631 event.owner_user = None 

632 event.owner_cluster = cluster 

633 

634 session.commit() 

635 return event_to_pb(session, occurrence, context) 

636 

637 def SetEventSubscription(self, request, context): 

638 with session_scope() as session: 

639 res = session.execute( 

640 select(Event, EventOccurrence) 

641 .where(EventOccurrence.id == request.event_id) 

642 .where(EventOccurrence.event_id == Event.id) 

643 ).one_or_none() 

644 

645 if not res: 

646 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

647 

648 event, occurrence = res 

649 

650 current_subscription = session.execute( 

651 select(EventSubscription) 

652 .where(EventSubscription.user_id == context.user_id) 

653 .where(EventSubscription.event_id == event.id) 

654 ).scalar_one_or_none() 

655 

656 # if not subscribed, subscribe 

657 if request.subscribe and not current_subscription: 

658 session.add(EventSubscription(user_id=context.user_id, event_id=event.id)) 

659 

660 # if subscribed but unsubbing, remove subscription 

661 if not request.subscribe and current_subscription: 

662 session.delete(current_subscription) 

663 

664 session.flush() 

665 

666 return event_to_pb(session, occurrence, context) 

667 

668 def SetEventAttendance(self, request, context): 

669 with session_scope() as session: 

670 occurrence = session.execute( 

671 select(EventOccurrence).where(EventOccurrence.id == request.event_id) 

672 ).scalar_one_or_none() 

673 

674 if not occurrence: 

675 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

676 

677 current_attendance = session.execute( 

678 select(EventOccurrenceAttendee) 

679 .where(EventOccurrenceAttendee.user_id == context.user_id) 

680 .where(EventOccurrenceAttendee.occurrence_id == occurrence.id) 

681 ).scalar_one_or_none() 

682 

683 if request.attendance_state == events_pb2.ATTENDANCE_STATE_NOT_GOING: 

684 if current_attendance: 

685 session.delete(current_attendance) 

686 # if unset/not going, nothing to do! 

687 else: 

688 if current_attendance: 

689 current_attendance.attendee_status = attendancestate2sql[request.attendance_state] 

690 else: 

691 # create new 

692 attendance = EventOccurrenceAttendee( 

693 user_id=context.user_id, 

694 occurrence_id=occurrence.id, 

695 attendee_status=attendancestate2sql[request.attendance_state], 

696 ) 

697 session.add(attendance) 

698 

699 session.flush() 

700 

701 return event_to_pb(session, occurrence, context) 

702 

703 def ListMyEvents(self, request, context): 

704 with session_scope() as session: 

705 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

706 # the page token is a unix timestamp of where we left off 

707 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

708 

709 occurrences = select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id) 

710 

711 include_all = not (request.subscribed or request.attending or request.organizing or request.my_communities) 

712 include_subscribed = request.subscribed or include_all 

713 include_organizing = request.organizing or include_all 

714 include_attending = request.attending or include_all 

715 include_my_communities = request.my_communities or include_all 

716 

717 where_ = [] 

718 

719 if include_subscribed: 

720 occurrences = occurrences.outerjoin( 

721 EventSubscription, 

722 and_(EventSubscription.event_id == Event.id, EventSubscription.user_id == context.user_id), 

723 ) 

724 where_.append(EventSubscription.user_id != None) 

725 if include_organizing: 

726 occurrences = occurrences.outerjoin( 

727 EventOrganizer, and_(EventOrganizer.event_id == Event.id, EventOrganizer.user_id == context.user_id) 

728 ) 

729 where_.append(EventOrganizer.user_id != None) 

730 if include_attending: 

731 occurrences = occurrences.outerjoin( 

732 EventOccurrenceAttendee, 

733 and_( 

734 EventOccurrenceAttendee.occurrence_id == EventOccurrence.id, 

735 EventOccurrenceAttendee.user_id == context.user_id, 

736 ), 

737 ) 

738 where_.append(EventOccurrenceAttendee.user_id != None) 

739 if include_my_communities: 

740 my_communities = ( 

741 session.execute( 

742 select(Node.id) 

743 .join(Cluster, Cluster.parent_node_id == Node.id) 

744 .join(ClusterSubscription, ClusterSubscription.cluster_id == Cluster.id) 

745 .where(ClusterSubscription.user_id == context.user_id) 

746 .where(Cluster.is_official_cluster) 

747 .order_by(Node.id) 

748 .limit(100000) 

749 ) 

750 .scalars() 

751 .all() 

752 ) 

753 where_.append(Event.parent_node_id.in_(my_communities)) 

754 

755 occurrences = occurrences.where(or_(*where_)) 

756 

757 if not request.past: 

758 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

759 EventOccurrence.start_time.asc() 

760 ) 

761 else: 

762 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

763 EventOccurrence.start_time.desc() 

764 ) 

765 

766 occurrences = occurrences.limit(page_size + 1) 

767 occurrences = session.execute(occurrences).scalars().all() 

768 

769 return events_pb2.ListMyEventsRes( 

770 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

771 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

772 ) 

773 

774 def ListAllEvents(self, request, context): 

775 with session_scope() as session: 

776 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

777 # the page token is a unix timestamp of where we left off 

778 page_token = dt_from_millis(int(request.page_token)) if request.page_token else now() 

779 

780 occurrences = select(EventOccurrence).join(Event, Event.id == EventOccurrence.event_id) 

781 

782 if not request.past: 

783 occurrences = occurrences.where(EventOccurrence.end_time > page_token - timedelta(seconds=1)).order_by( 

784 EventOccurrence.start_time.asc() 

785 ) 

786 else: 

787 occurrences = occurrences.where(EventOccurrence.end_time < page_token + timedelta(seconds=1)).order_by( 

788 EventOccurrence.start_time.desc() 

789 ) 

790 

791 occurrences = occurrences.limit(page_size + 1) 

792 occurrences = session.execute(occurrences).scalars().all() 

793 

794 return events_pb2.ListAllEventsRes( 

795 events=[event_to_pb(session, occurrence, context) for occurrence in occurrences[:page_size]], 

796 next_page_token=str(millis_from_dt(occurrences[-1].end_time)) if len(occurrences) > page_size else None, 

797 ) 

798 

799 def InviteEventOrganizer(self, request, context): 

800 with session_scope() as session: 

801 res = session.execute( 

802 select(Event, EventOccurrence) 

803 .where(EventOccurrence.id == request.event_id) 

804 .where(EventOccurrence.event_id == Event.id) 

805 ).one_or_none() 

806 

807 if not res: 

808 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

809 

810 event, occurrence = res 

811 

812 if not _can_edit_event(session, event, context.user_id): 

813 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.EVENT_EDIT_PERMISSION_DENIED) 

814 

815 if not session.execute( 

816 select(User).where_users_visible(context).where(User.id == request.user_id) 

817 ).scalar_one_or_none(): 

818 context.abort(grpc.StatusCode.PERMISSION_DENIED, errors.USER_NOT_FOUND) 

819 

820 organizer = EventOrganizer( 

821 user_id=request.user_id, 

822 event=event, 

823 ) 

824 session.add(organizer) 

825 session.flush() 

826 

827 # TODO: notify 

828 

829 return empty_pb2.Empty() 

830 

831 def RemoveEventOrganizer(self, request, context): 

832 with session_scope() as session: 

833 res = session.execute( 

834 select(Event, EventOccurrence) 

835 .where(EventOccurrence.id == request.event_id) 

836 .where(EventOccurrence.event_id == Event.id) 

837 ).one_or_none() 

838 

839 if not res: 

840 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

841 

842 event, occurrence = res 

843 

844 if event.owner_user_id == context.user_id: 

845 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_CANT_REMOVE_OWNER_AS_ORGANIZER) 

846 

847 current = session.execute( 

848 select(EventOrganizer) 

849 .where(EventOrganizer.user_id == context.user_id) 

850 .where(EventOrganizer.event_id == event.id) 

851 ).scalar_one_or_none() 

852 

853 if not current: 

854 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_NOT_AN_ORGANIZER) 

855 

856 session.delete(current) 

857 

858 return empty_pb2.Empty()