Coverage for src/couchers/servicers/admin.py: 67%

393 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-02 02:47 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from geoalchemy2.shape import from_shape 

7from google.protobuf import empty_pb2 

8from shapely.geometry import shape 

9from sqlalchemy.sql import and_, func, or_, select, update 

10from user_agents import parse as user_agents_parse 

11 

12from couchers import errors, urls 

13from couchers.crypto import urlsafe_secure_token 

14from couchers.db import session_scope 

15from couchers.helpers.badges import user_add_badge, user_remove_badge 

16from couchers.helpers.clusters import create_cluster, create_node 

17from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

18from couchers.jobs.enqueue import queue_job 

19from couchers.models import ( 

20 AccountDeletionToken, 

21 Comment, 

22 ContentReport, 

23 Discussion, 

24 Event, 

25 EventCommunityInviteRequest, 

26 EventOccurrence, 

27 GroupChat, 

28 GroupChatSubscription, 

29 HostRequest, 

30 LanguageAbility, 

31 Message, 

32 ModNote, 

33 Node, 

34 Reference, 

35 Reply, 

36 User, 

37 UserActivity, 

38 UserBadge, 

39) 

40from couchers.notifications.notify import notify 

41from couchers.resources import get_badge_dict 

42from couchers.servicers.api import get_strong_verification_fields, user_model_to_pb 

43from couchers.servicers.auth import create_session 

44from couchers.servicers.communities import community_to_pb 

45from couchers.servicers.events import get_users_to_notify_for_new_event 

46from couchers.servicers.threads import unpack_thread_id 

47from couchers.sql import couchers_select as select 

48from couchers.utils import Timestamp_from_datetime, date_to_api, make_user_context, now, parse_date, to_aware_datetime 

49from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

50from proto.internal import jobs_pb2 

51 

52logger = logging.getLogger(__name__) 

53 

54MAX_PAGINATION_LENGTH = 250 

55 

56 

57def _user_to_details(session, user): 

58 return admin_pb2.UserDetails( 

59 user_id=user.id, 

60 username=user.username, 

61 name=user.name, 

62 email=user.email, 

63 gender=user.gender, 

64 birthdate=date_to_api(user.birthdate), 

65 banned=user.is_banned, 

66 deleted=user.is_deleted, 

67 do_not_email=user.do_not_email, 

68 badges=[badge.badge_id for badge in user.badges], 

69 **get_strong_verification_fields(session, user), 

70 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

71 admin_note=user.admin_note, 

72 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

73 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

74 ) 

75 

76 

77def _content_report_to_pb(content_report: ContentReport): 

78 return admin_pb2.ContentReport( 

79 content_report_id=content_report.id, 

80 time=Timestamp_from_datetime(content_report.time), 

81 reporting_user_id=content_report.reporting_user_id, 

82 author_user_id=content_report.author_user_id, 

83 reason=content_report.reason, 

84 description=content_report.description, 

85 content_ref=content_report.content_ref, 

86 user_agent=content_report.user_agent, 

87 page=content_report.page, 

88 ) 

89 

90 

91def append_admin_note(session, context, user, note): 

92 if not note.strip(): 

93 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

94 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

95 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

96 

97 

98def load_community_geom(geojson, context): 

99 geom = shape(json.loads(geojson)) 

100 

101 if geom.geom_type != "MultiPolygon": 

102 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

103 

104 return geom 

105 

106 

107def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload): 

108 with session_scope() as session: 

109 all_users = session.execute(select(User).where(User.is_visible)).scalars().all() 

110 for user in all_users: 

111 context = make_user_context(user_id=user.id) 

112 notify( 

113 session, 

114 user_id=user.id, 

115 topic_action="general:new_blog_post", 

116 data=notification_data_pb2.GeneralNewBlogPost( 

117 url=payload.url, 

118 title=payload.title, 

119 blurb=payload.blurb, 

120 ), 

121 ) 

122 

123 

124class Admin(admin_pb2_grpc.AdminServicer): 

125 def GetUserDetails(self, request, context, session): 

126 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

127 if not user: 

128 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

129 return _user_to_details(session, user) 

130 

131 def GetUser(self, request, context, session): 

132 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

133 if not user: 

134 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

135 return user_model_to_pb(user, session, context) 

136 

137 def SearchUsers(self, request, context, session): 

138 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

139 next_user_id = int(request.page_token) if request.page_token else 0 

140 statement = select(User) 

141 if request.username: 

142 statement = statement.where(User.username.ilike(request.username)) 

143 if request.email: 

144 statement = statement.where(User.email.ilike(request.email)) 

145 if request.name: 

146 statement = statement.where(User.name.ilike(request.name)) 

147 if request.admin_note: 

148 statement = statement.where(User.admin_note.ilike(request.admin_note)) 

149 if request.city: 

150 statement = statement.where(User.city.ilike(request.city)) 

151 if request.min_user_id: 

152 statement = statement.where(User.id >= request.min_user_id) 

153 if request.max_user_id: 

154 statement = statement.where(User.id <= request.max_user_id) 

155 if request.min_birthdate: 

156 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

157 if request.max_birthdate: 

158 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

159 if request.genders: 

160 statement = statement.where(User.gender.in_(request.genders)) 

161 if request.min_joined_date: 

162 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

163 if request.max_joined_date: 

164 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

165 if request.min_last_active_date: 

166 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

167 if request.max_last_active_date: 

168 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

169 if request.genders: 

170 statement = statement.where(User.gender.in_(request.genders)) 

171 if request.language_codes: 

172 statement = statement.join( 

173 LanguageAbility, 

174 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

175 ) 

176 if request.HasField("is_deleted"): 

177 statement = statement.where(User.is_deleted == request.is_deleted.value) 

178 if request.HasField("is_banned"): 

179 statement = statement.where(User.is_banned == request.is_banned.value) 

180 if request.HasField("has_avatar"): 

181 if request.has_avatar.value: 

182 statement = statement.where(User.avatar_key != None) 

183 else: 

184 statement = statement.where(User.avatar_key == None) 

185 users = ( 

186 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1)) 

187 .scalars() 

188 .all() 

189 ) 

190 logger.info(users) 

191 return admin_pb2.SearchUsersRes( 

192 users=[_user_to_details(session, user) for user in users[:page_size]], 

193 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

194 ) 

195 

196 def ChangeUserGender(self, request, context, session): 

197 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

198 if not user: 

199 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

200 user.gender = request.gender 

201 session.commit() 

202 

203 notify( 

204 session, 

205 user_id=user.id, 

206 topic_action="gender:change", 

207 data=notification_data_pb2.GenderChange( 

208 gender=request.gender, 

209 ), 

210 ) 

211 

212 return _user_to_details(session, user) 

213 

214 def ChangeUserBirthdate(self, request, context, session): 

215 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

216 if not user: 

217 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

218 user.birthdate = parse_date(request.birthdate) 

219 session.commit() 

220 

221 notify( 

222 session, 

223 user_id=user.id, 

224 topic_action="birthdate:change", 

225 data=notification_data_pb2.BirthdateChange( 

226 birthdate=request.birthdate, 

227 ), 

228 ) 

229 

230 return _user_to_details(session, user) 

231 

232 def AddBadge(self, request, context, session): 

233 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

234 if not user: 

235 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

236 

237 badge = get_badge_dict().get(request.badge_id) 

238 if not badge: 

239 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

240 

241 if not badge["admin_editable"]: 

242 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

243 

244 if badge["id"] in [b.badge_id for b in user.badges]: 

245 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

246 

247 user_add_badge(session, user.id, request.badge_id) 

248 

249 return _user_to_details(session, user) 

250 

251 def RemoveBadge(self, request, context, session): 

252 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

253 if not user: 

254 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

255 

256 badge = get_badge_dict().get(request.badge_id) 

257 if not badge: 

258 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

259 

260 if not badge["admin_editable"]: 

261 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

262 

263 user_badge = session.execute( 

264 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

265 ).scalar_one_or_none() 

266 if not user_badge: 

267 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

268 

269 user_remove_badge(session, user.id, request.badge_id) 

270 

271 return _user_to_details(session, user) 

272 

273 def SetPassportSexGenderException(self, request, context, session): 

274 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

275 if not user: 

276 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

277 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

278 return _user_to_details(session, user) 

279 

280 def BanUser(self, request, context, session): 

281 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

282 if not user: 

283 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

284 append_admin_note(session, context, user, request.admin_note) 

285 user.is_banned = True 

286 return _user_to_details(session, user) 

287 

288 def UnbanUser(self, request, context, session): 

289 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

290 if not user: 

291 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

292 append_admin_note(session, context, user, request.admin_note) 

293 user.is_banned = False 

294 return _user_to_details(session, user) 

295 

296 def AddAdminNote(self, request, context, session): 

297 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

298 if not user: 

299 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

300 append_admin_note(session, context, user, request.admin_note) 

301 return _user_to_details(session, user) 

302 

303 def GetContentReport(self, request, context, session): 

304 content_report = session.execute( 

305 select(ContentReport).where(ContentReport.id == request.content_report_id) 

306 ).scalar_one_or_none() 

307 if not content_report: 

308 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

309 return admin_pb2.GetContentReportRes( 

310 content_report=_content_report_to_pb(content_report), 

311 ) 

312 

313 def GetContentReportsForAuthor(self, request, context, session): 

314 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

315 if not user: 

316 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

317 content_reports = ( 

318 session.execute( 

319 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

320 ) 

321 .scalars() 

322 .all() 

323 ) 

324 return admin_pb2.GetContentReportsForAuthorRes( 

325 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

326 ) 

327 

328 def SendModNote(self, request, context, session): 

329 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

330 if not user: 

331 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

332 session.add( 

333 ModNote( 

334 user_id=user.id, 

335 internal_id=request.internal_id, 

336 creator_user_id=context.user_id, 

337 note_content=request.content, 

338 ) 

339 ) 

340 session.flush() 

341 

342 if not request.do_not_notify: 

343 notify( 

344 session, 

345 user_id=user.id, 

346 topic_action="modnote:create", 

347 ) 

348 

349 return _user_to_details(session, user) 

350 

351 def MarkUserNeedsLocationUpdate(self, request, context, session): 

352 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

353 if not user: 

354 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

355 user.needs_to_update_location = True 

356 return _user_to_details(session, user) 

357 

358 def DeleteUser(self, request, context, session): 

359 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

360 if not user: 

361 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

362 user.is_deleted = True 

363 return _user_to_details(session, user) 

364 

365 def RecoverDeletedUser(self, request, context, session): 

366 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

367 if not user: 

368 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

369 user.is_deleted = False 

370 return _user_to_details(session, user) 

371 

372 def CreateApiKey(self, request, context, session): 

373 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

374 if not user: 

375 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

376 token, expiry = create_session( 

377 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

378 ) 

379 

380 notify( 

381 session, 

382 user_id=user.id, 

383 topic_action="api_key:create", 

384 data=notification_data_pb2.ApiKeyCreate( 

385 api_key=token, 

386 expiry=Timestamp_from_datetime(expiry), 

387 ), 

388 ) 

389 

390 return _user_to_details(session, user) 

391 

392 def CreateCommunity(self, request, context, session): 

393 geom = load_community_geom(request.geojson, context) 

394 

395 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

396 node = create_node(session, geom, parent_node_id) 

397 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

398 

399 return community_to_pb(session, node, context) 

400 

401 def UpdateCommunity(self, request, context, session): 

402 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

403 if not node: 

404 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

405 cluster = node.official_cluster 

406 

407 if request.name: 

408 cluster.name = request.name 

409 

410 if request.description: 

411 cluster.description = request.description 

412 

413 if request.geojson: 

414 geom = load_community_geom(request.geojson, context) 

415 

416 node.geom = from_shape(geom) 

417 

418 if request.parent_node_id != 0: 

419 node.parent_node_id = request.parent_node_id 

420 

421 session.flush() 

422 

423 return community_to_pb(session, cluster.parent_node, context) 

424 

425 def GetChats(self, request, context, session): 

426 def format_user(user): 

427 return f"{user.name} ({user.username}, {user.id})" 

428 

429 def format_conversation(conversation_id): 

430 out = "" 

431 messages = ( 

432 session.execute( 

433 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

434 ) 

435 .scalars() 

436 .all() 

437 ) 

438 for message in messages: 

439 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

440 out += str(message.text) 

441 out += "\n\n-----\n" 

442 out += "\n\n\n\n" 

443 return out 

444 

445 def format_host_request(host_request_id): 

446 out = "" 

447 host_request = session.execute( 

448 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

449 ).scalar_one() 

450 out += "==============================\n" 

451 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

452 out += format_conversation(host_request.conversation_id) 

453 out += "\n\n\n\n" 

454 return out 

455 

456 def format_group_chat(group_chat_id): 

457 out = "" 

458 group_chat = session.execute( 

459 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

460 ).scalar_one() 

461 out += "==============================\n" 

462 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

463 subs = ( 

464 session.execute( 

465 select(GroupChatSubscription) 

466 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

467 .order_by(GroupChatSubscription.joined.asc()) 

468 ) 

469 .scalars() 

470 .all() 

471 ) 

472 for sub in subs: 

473 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

474 out += "\n\nMessages:\n" 

475 out += format_conversation(group_chat.conversation_id) 

476 out += "\n\n\n\n" 

477 return out 

478 

479 def format_all_chats_for_user(user_id): 

480 out = "" 

481 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

482 out += f"Chats for user {format_user(user)}\n" 

483 host_request_ids = ( 

484 session.execute( 

485 select(HostRequest.conversation_id) 

486 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

487 .order_by(HostRequest.conversation_id.desc()) 

488 ) 

489 .scalars() 

490 .all() 

491 ) 

492 out += f"************************************* Requests ({len(host_request_ids)})\n" 

493 for host_request in host_request_ids: 

494 out += format_host_request(host_request) 

495 group_chat_ids = ( 

496 session.execute( 

497 select(GroupChatSubscription.group_chat_id) 

498 .where(GroupChatSubscription.user_id == user_id) 

499 .order_by(GroupChatSubscription.joined.desc()) 

500 ) 

501 .scalars() 

502 .all() 

503 ) 

504 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

505 for group_chat_id in group_chat_ids: 

506 out += format_group_chat(group_chat_id) 

507 return out 

508 

509 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

510 if not user: 

511 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

512 

513 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

514 

515 def ListEventCommunityInviteRequests(self, request, context, session): 

516 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

517 next_request_id = int(request.page_token) if request.page_token else 0 

518 requests = ( 

519 session.execute( 

520 select(EventCommunityInviteRequest) 

521 .where(EventCommunityInviteRequest.approved.is_(None)) 

522 .where(EventCommunityInviteRequest.id >= next_request_id) 

523 .order_by(EventCommunityInviteRequest.id) 

524 .limit(page_size + 1) 

525 ) 

526 .scalars() 

527 .all() 

528 ) 

529 

530 def _request_to_pb(request): 

531 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

532 return admin_pb2.EventCommunityInviteRequest( 

533 event_community_invite_request_id=request.id, 

534 user_id=request.user_id, 

535 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

536 approx_users_to_notify=len(users_to_notify), 

537 community_id=node_id, 

538 ) 

539 

540 return admin_pb2.ListEventCommunityInviteRequestsRes( 

541 requests=[_request_to_pb(request) for request in requests[:page_size]], 

542 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

543 ) 

544 

545 def DecideEventCommunityInviteRequest(self, request, context, session): 

546 req = session.execute( 

547 select(EventCommunityInviteRequest).where( 

548 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

549 ) 

550 ).scalar_one_or_none() 

551 

552 if not req: 

553 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

554 

555 if req.decided: 

556 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

557 

558 decided = now() 

559 req.decided = decided 

560 req.decided_by_user_id = context.user_id 

561 req.approved = request.approve 

562 

563 # deny other reqs for the same event 

564 if request.approve: 

565 session.execute( 

566 update(EventCommunityInviteRequest) 

567 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

568 .where(EventCommunityInviteRequest.decided.is_(None)) 

569 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

570 ) 

571 

572 session.flush() 

573 

574 if request.approve: 

575 queue_job( 

576 session, 

577 "generate_event_create_notifications", 

578 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

579 inviting_user_id=req.user_id, 

580 occurrence_id=req.occurrence_id, 

581 approved=True, 

582 ), 

583 ) 

584 

585 return admin_pb2.DecideEventCommunityInviteRequestRes() 

586 

587 def DeleteEvent(self, request, context, session): 

588 res = session.execute( 

589 select(Event, EventOccurrence) 

590 .where(EventOccurrence.id == request.event_id) 

591 .where(EventOccurrence.event_id == Event.id) 

592 .where(~EventOccurrence.is_deleted) 

593 ).one_or_none() 

594 

595 if not res: 

596 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

597 

598 event, occurrence = res 

599 

600 occurrence.is_deleted = True 

601 

602 queue_job( 

603 session, 

604 "generate_event_delete_notifications", 

605 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

606 occurrence_id=occurrence.id, 

607 ), 

608 ) 

609 

610 return empty_pb2.Empty() 

611 

612 def ListUserIds(self, request, context, session): 

613 start_date = request.start_time.ToDatetime() 

614 end_date = request.end_time.ToDatetime() 

615 

616 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

617 next_user_id = int(request.page_token) if request.page_token else 0 

618 

619 user_ids = ( 

620 session.execute( 

621 select(User.id) 

622 .where(or_(User.id <= next_user_id, next_user_id == 0)) 

623 .where(User.joined >= start_date) 

624 .where(User.joined <= end_date) 

625 .order_by(User.id.desc()) 

626 .limit(page_size + 1) 

627 ) 

628 .scalars() 

629 .all() 

630 ) 

631 

632 return admin_pb2.ListUserIdsRes( 

633 user_ids=user_ids[:page_size], 

634 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

635 ) 

636 

637 def EditReferenceText(self, request, context, session): 

638 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

639 

640 if reference is None: 

641 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

642 

643 if not request.new_text.strip(): 

644 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT) 

645 

646 reference.text = request.new_text.strip() 

647 return empty_pb2.Empty() 

648 

649 def DeleteReference(self, request, context, session): 

650 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

651 

652 if reference is None: 

653 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

654 

655 reference.is_deleted = True 

656 return empty_pb2.Empty() 

657 

658 def EditDiscussion(self, request, context, session): 

659 discussion = session.execute( 

660 select(Discussion).where(Discussion.id == request.discussion_id) 

661 ).scalar_one_or_none() 

662 if not discussion: 

663 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

664 if request.new_title: 

665 discussion.title = request.new_title.strip() 

666 if request.new_content: 

667 discussion.content = request.new_content.strip() 

668 return empty_pb2.Empty() 

669 

670 def EditReply(self, request, context, session): 

671 database_id, depth = unpack_thread_id(request.reply_id) 

672 if depth == 1: 

673 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none() 

674 elif depth == 2: 

675 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

676 if not obj: 

677 context.abort(grpc.StatusCode.NOT_FOUND, errors.OBJECT_NOT_FOUND) 

678 obj.content = request.new_content.strip() 

679 return empty_pb2.Empty() 

680 

681 def CreateAccountDeletionLink(self, request, context, session): 

682 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

683 if not user: 

684 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

685 expiry_days = request.expiry_days or 7 

686 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

687 session.add(token) 

688 return admin_pb2.CreateAccountDeletionLinkRes( 

689 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

690 ) 

691 

692 def AccessStats(self, request, context, session): 

693 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

694 if not user: 

695 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

696 

697 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90) 

698 end_time = to_aware_datetime(request.end_time) if request.end_time else now() 

699 

700 user_activity = session.execute( 

701 select( 

702 UserActivity.ip_address, 

703 UserActivity.user_agent, 

704 func.sum(UserActivity.api_calls), 

705 func.count(UserActivity.period), 

706 func.min(UserActivity.period), 

707 func.max(UserActivity.period), 

708 ) 

709 .where(UserActivity.user_id == user.id) 

710 .where(UserActivity.period >= start_time) 

711 .where(UserActivity.period >= end_time) 

712 .order_by(func.max(UserActivity.period).desc()) 

713 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

714 ).all() 

715 

716 out = admin_pb2.AccessStatsRes() 

717 

718 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

719 user_agent_data = user_agents_parse(user_agent or "") 

720 asn = geoip_asn(ip_address) 

721 out.stats.append( 

722 admin_pb2.AccessStat( 

723 ip_address=ip_address, 

724 asn=str(asn[0]) if asn else None, 

725 asorg=str(asn[1]) if asn else None, 

726 asnetwork=str(asn[2]) if asn else None, 

727 user_agent=user_agent, 

728 operating_system=user_agent_data.os.family, 

729 browser=user_agent_data.browser.family, 

730 device=user_agent_data.device.family, 

731 approximate_location=geoip_approximate_location(ip_address) or "Unknown", 

732 api_call_count=api_call_count, 

733 periods_count=periods_count, 

734 first_seen=Timestamp_from_datetime(first_seen), 

735 last_seen=Timestamp_from_datetime(last_seen), 

736 ) 

737 ) 

738 

739 return out 

740 

741 def SendBlogPostNotification(self, request, context, session): 

742 if len(request.title) > 50: 

743 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_TITLE_TOO_LONG) 

744 if len(request.blurb) > 100: 

745 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_BLURB_TOO_LONG) 

746 queue_job( 

747 session, 

748 "generate_new_blog_post_notifications", 

749 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

750 url=request.url, 

751 title=request.title, 

752 blurb=request.blurb, 

753 ), 

754 ) 

755 return empty_pb2.Empty()