Coverage for src/couchers/servicers/admin.py: 69%

435 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-08-28 14:55 +0000

1import json 

2import logging 

3from datetime import timedelta 

4 

5import grpc 

6from geoalchemy2.shape import from_shape 

7from google.protobuf import empty_pb2 

8from shapely.geometry import shape 

9from sqlalchemy.sql import and_, func, or_, select, update 

10from user_agents import parse as user_agents_parse 

11 

12from couchers import errors, urls 

13from couchers.context import make_background_user_context 

14from couchers.crypto import urlsafe_secure_token 

15from couchers.db import session_scope 

16from couchers.helpers.badges import user_add_badge, user_remove_badge 

17from couchers.helpers.clusters import create_cluster, create_node 

18from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

19from couchers.helpers.strong_verification import get_strong_verification_fields 

20from couchers.jobs.enqueue import queue_job 

21from couchers.models import ( 

22 AccountDeletionToken, 

23 Comment, 

24 ContentReport, 

25 Discussion, 

26 Event, 

27 EventCommunityInviteRequest, 

28 EventOccurrence, 

29 GroupChat, 

30 GroupChatSubscription, 

31 HostRequest, 

32 LanguageAbility, 

33 Message, 

34 ModerationUserList, 

35 ModNote, 

36 Node, 

37 Reference, 

38 Reply, 

39 User, 

40 UserActivity, 

41 UserBadge, 

42) 

43from couchers.notifications.notify import notify 

44from couchers.resources import get_badge_dict 

45from couchers.servicers.api import user_model_to_pb 

46from couchers.servicers.auth import create_session 

47from couchers.servicers.communities import community_to_pb 

48from couchers.servicers.events import get_users_to_notify_for_new_event 

49from couchers.servicers.threads import unpack_thread_id 

50from couchers.sql import couchers_select as select 

51from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime 

52from proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

53from proto.internal import jobs_pb2 

54 

55logger = logging.getLogger(__name__) 

56 

57MAX_PAGINATION_LENGTH = 250 

58 

59 

60def _user_to_details(session, user): 

61 return admin_pb2.UserDetails( 

62 user_id=user.id, 

63 username=user.username, 

64 name=user.name, 

65 email=user.email, 

66 gender=user.gender, 

67 birthdate=date_to_api(user.birthdate), 

68 banned=user.is_banned, 

69 deleted=user.is_deleted, 

70 do_not_email=user.do_not_email, 

71 badges=[badge.badge_id for badge in user.badges], 

72 **get_strong_verification_fields(session, user), 

73 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

74 admin_note=user.admin_note, 

75 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

76 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

77 ) 

78 

79 

80def _content_report_to_pb(content_report: ContentReport): 

81 return admin_pb2.ContentReport( 

82 content_report_id=content_report.id, 

83 time=Timestamp_from_datetime(content_report.time), 

84 reporting_user_id=content_report.reporting_user_id, 

85 author_user_id=content_report.author_user_id, 

86 reason=content_report.reason, 

87 description=content_report.description, 

88 content_ref=content_report.content_ref, 

89 user_agent=content_report.user_agent, 

90 page=content_report.page, 

91 ) 

92 

93 

94def append_admin_note(session, context, user, note): 

95 if not note.strip(): 

96 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.ADMIN_NOTE_CANT_BE_EMPTY) 

97 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

98 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

99 

100 

101def load_community_geom(geojson, context): 

102 geom = shape(json.loads(geojson)) 

103 

104 if geom.geom_type != "MultiPolygon": 

105 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.NO_MULTIPOLYGON) 

106 

107 return geom 

108 

109 

110def generate_new_blog_post_notifications(payload: jobs_pb2.GenerateNewBlogPostNotificationsPayload): 

111 with session_scope() as session: 

112 all_users = session.execute(select(User).where(User.is_visible)).scalars().all() 

113 for user in all_users: 

114 context = make_background_user_context(user_id=user.id) 

115 notify( 

116 session, 

117 user_id=user.id, 

118 topic_action="general:new_blog_post", 

119 data=notification_data_pb2.GeneralNewBlogPost( 

120 url=payload.url, 

121 title=payload.title, 

122 blurb=payload.blurb, 

123 ), 

124 ) 

125 

126 

127class Admin(admin_pb2_grpc.AdminServicer): 

128 def GetUserDetails(self, request, context, session): 

129 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

130 if not user: 

131 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

132 return _user_to_details(session, user) 

133 

134 def GetUser(self, request, context, session): 

135 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

136 if not user: 

137 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

138 return user_model_to_pb(user, session, context) 

139 

140 def SearchUsers(self, request, context, session): 

141 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

142 next_user_id = int(request.page_token) if request.page_token else 0 

143 statement = select(User) 

144 if request.username: 

145 statement = statement.where(User.username.ilike(request.username)) 

146 if request.email: 

147 statement = statement.where(User.email.ilike(request.email)) 

148 if request.name: 

149 statement = statement.where(User.name.ilike(request.name)) 

150 if request.admin_note: 

151 statement = statement.where(User.admin_note.ilike(request.admin_note)) 

152 if request.city: 

153 statement = statement.where(User.city.ilike(request.city)) 

154 if request.min_user_id: 

155 statement = statement.where(User.id >= request.min_user_id) 

156 if request.max_user_id: 

157 statement = statement.where(User.id <= request.max_user_id) 

158 if request.min_birthdate: 

159 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

160 if request.max_birthdate: 

161 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

162 if request.genders: 

163 statement = statement.where(User.gender.in_(request.genders)) 

164 if request.min_joined_date: 

165 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

166 if request.max_joined_date: 

167 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

168 if request.min_last_active_date: 

169 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

170 if request.max_last_active_date: 

171 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

172 if request.genders: 

173 statement = statement.where(User.gender.in_(request.genders)) 

174 if request.language_codes: 

175 statement = statement.join( 

176 LanguageAbility, 

177 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

178 ) 

179 if request.HasField("is_deleted"): 

180 statement = statement.where(User.is_deleted == request.is_deleted.value) 

181 if request.HasField("is_banned"): 

182 statement = statement.where(User.is_banned == request.is_banned.value) 

183 if request.HasField("has_avatar"): 

184 if request.has_avatar.value: 

185 statement = statement.where(User.avatar_key != None) 

186 else: 

187 statement = statement.where(User.avatar_key == None) 

188 users = ( 

189 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1)) 

190 .scalars() 

191 .all() 

192 ) 

193 logger.info(users) 

194 return admin_pb2.SearchUsersRes( 

195 users=[_user_to_details(session, user) for user in users[:page_size]], 

196 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

197 ) 

198 

199 def ChangeUserGender(self, request, context, session): 

200 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

201 if not user: 

202 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

203 user.gender = request.gender 

204 session.commit() 

205 

206 notify( 

207 session, 

208 user_id=user.id, 

209 topic_action="gender:change", 

210 data=notification_data_pb2.GenderChange( 

211 gender=request.gender, 

212 ), 

213 ) 

214 

215 return _user_to_details(session, user) 

216 

217 def ChangeUserBirthdate(self, request, context, session): 

218 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

219 if not user: 

220 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

221 user.birthdate = parse_date(request.birthdate) 

222 session.commit() 

223 

224 notify( 

225 session, 

226 user_id=user.id, 

227 topic_action="birthdate:change", 

228 data=notification_data_pb2.BirthdateChange( 

229 birthdate=request.birthdate, 

230 ), 

231 ) 

232 

233 return _user_to_details(session, user) 

234 

235 def AddBadge(self, request, context, session): 

236 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

237 if not user: 

238 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

239 

240 badge = get_badge_dict().get(request.badge_id) 

241 if not badge: 

242 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

243 

244 if not badge["admin_editable"]: 

245 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

246 

247 if badge["id"] in [b.badge_id for b in user.badges]: 

248 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_ALREADY_HAS_BADGE) 

249 

250 user_add_badge(session, user.id, request.badge_id) 

251 

252 return _user_to_details(session, user) 

253 

254 def RemoveBadge(self, request, context, session): 

255 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

256 if not user: 

257 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

258 

259 badge = get_badge_dict().get(request.badge_id) 

260 if not badge: 

261 context.abort(grpc.StatusCode.NOT_FOUND, errors.BADGE_NOT_FOUND) 

262 

263 if not badge["admin_editable"]: 

264 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_CANNOT_EDIT_BADGE) 

265 

266 user_badge = session.execute( 

267 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

268 ).scalar_one_or_none() 

269 if not user_badge: 

270 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_DOES_NOT_HAVE_BADGE) 

271 

272 user_remove_badge(session, user.id, request.badge_id) 

273 

274 return _user_to_details(session, user) 

275 

276 def SetPassportSexGenderException(self, request, context, session): 

277 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

278 if not user: 

279 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

280 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

281 return _user_to_details(session, user) 

282 

283 def BanUser(self, request, context, session): 

284 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

285 if not user: 

286 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

287 append_admin_note(session, context, user, request.admin_note) 

288 user.is_banned = True 

289 return _user_to_details(session, user) 

290 

291 def UnbanUser(self, request, context, session): 

292 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

293 if not user: 

294 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

295 append_admin_note(session, context, user, request.admin_note) 

296 user.is_banned = False 

297 return _user_to_details(session, user) 

298 

299 def AddAdminNote(self, request, context, session): 

300 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

301 if not user: 

302 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

303 append_admin_note(session, context, user, request.admin_note) 

304 return _user_to_details(session, user) 

305 

306 def GetContentReport(self, request, context, session): 

307 content_report = session.execute( 

308 select(ContentReport).where(ContentReport.id == request.content_report_id) 

309 ).scalar_one_or_none() 

310 if not content_report: 

311 context.abort(grpc.StatusCode.NOT_FOUND, errors.CONTENT_REPORT_NOT_FOUND) 

312 return admin_pb2.GetContentReportRes( 

313 content_report=_content_report_to_pb(content_report), 

314 ) 

315 

316 def GetContentReportsForAuthor(self, request, context, session): 

317 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

318 if not user: 

319 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

320 content_reports = ( 

321 session.execute( 

322 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

323 ) 

324 .scalars() 

325 .all() 

326 ) 

327 return admin_pb2.GetContentReportsForAuthorRes( 

328 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

329 ) 

330 

331 def SendModNote(self, request, context, session): 

332 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

333 if not user: 

334 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

335 session.add( 

336 ModNote( 

337 user_id=user.id, 

338 internal_id=request.internal_id, 

339 creator_user_id=context.user_id, 

340 note_content=request.content, 

341 ) 

342 ) 

343 session.flush() 

344 

345 if not request.do_not_notify: 

346 notify( 

347 session, 

348 user_id=user.id, 

349 topic_action="modnote:create", 

350 ) 

351 

352 return _user_to_details(session, user) 

353 

354 def MarkUserNeedsLocationUpdate(self, request, context, session): 

355 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

356 if not user: 

357 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

358 user.needs_to_update_location = True 

359 return _user_to_details(session, user) 

360 

361 def DeleteUser(self, request, context, session): 

362 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

363 if not user: 

364 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

365 user.is_deleted = True 

366 return _user_to_details(session, user) 

367 

368 def RecoverDeletedUser(self, request, context, session): 

369 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

370 if not user: 

371 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

372 user.is_deleted = False 

373 return _user_to_details(session, user) 

374 

375 def CreateApiKey(self, request, context, session): 

376 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

377 if not user: 

378 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

379 token, expiry = create_session( 

380 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

381 ) 

382 

383 notify( 

384 session, 

385 user_id=user.id, 

386 topic_action="api_key:create", 

387 data=notification_data_pb2.ApiKeyCreate( 

388 api_key=token, 

389 expiry=Timestamp_from_datetime(expiry), 

390 ), 

391 ) 

392 

393 return _user_to_details(session, user) 

394 

395 def CreateCommunity(self, request, context, session): 

396 geom = load_community_geom(request.geojson, context) 

397 

398 parent_node_id = request.parent_node_id if request.parent_node_id != 0 else None 

399 node = create_node(session, geom, parent_node_id) 

400 create_cluster(session, node.id, request.name, request.description, context.user_id, request.admin_ids, True) 

401 

402 return community_to_pb(session, node, context) 

403 

404 def UpdateCommunity(self, request, context, session): 

405 node = session.execute(select(Node).where(Node.id == request.community_id)).scalar_one_or_none() 

406 if not node: 

407 context.abort(grpc.StatusCode.NOT_FOUND, errors.COMMUNITY_NOT_FOUND) 

408 cluster = node.official_cluster 

409 

410 if request.name: 

411 cluster.name = request.name 

412 

413 if request.description: 

414 cluster.description = request.description 

415 

416 if request.geojson: 

417 geom = load_community_geom(request.geojson, context) 

418 

419 node.geom = from_shape(geom) 

420 

421 if request.parent_node_id != 0: 

422 node.parent_node_id = request.parent_node_id 

423 

424 session.flush() 

425 

426 return community_to_pb(session, cluster.parent_node, context) 

427 

428 def GetChats(self, request, context, session): 

429 def format_user(user): 

430 return f"{user.name} ({user.username}, {user.id})" 

431 

432 def format_conversation(conversation_id): 

433 out = "" 

434 messages = ( 

435 session.execute( 

436 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

437 ) 

438 .scalars() 

439 .all() 

440 ) 

441 for message in messages: 

442 out += f"Message {message.id} by {format_user(message.author)} at {message.time}\nType={message.message_type}, host_req_status_change={message.host_request_status_target}\n\n" 

443 out += str(message.text) 

444 out += "\n\n-----\n" 

445 out += "\n\n\n\n" 

446 return out 

447 

448 def format_host_request(host_request_id): 

449 out = "" 

450 host_request = session.execute( 

451 select(HostRequest).where(HostRequest.conversation_id == host_request_id) 

452 ).scalar_one() 

453 out += "==============================\n" 

454 out += f"Host request {host_request.conversation_id} from {format_user(host_request.surfer)} to {format_user(host_request.host)}.\nCurrent state = {host_request.status}\n\nMessages:\n" 

455 out += format_conversation(host_request.conversation_id) 

456 out += "\n\n\n\n" 

457 return out 

458 

459 def format_group_chat(group_chat_id): 

460 out = "" 

461 group_chat = session.execute( 

462 select(GroupChat).where(GroupChat.conversation_id == group_chat_id) 

463 ).scalar_one() 

464 out += "==============================\n" 

465 out += f"Group chat {group_chat.conversation_id}. Created by {format_user(group_chat.creator)}, is_dm={group_chat.is_dm}\nName: {group_chat.title}\nMembers:\n" 

466 subs = ( 

467 session.execute( 

468 select(GroupChatSubscription) 

469 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

470 .order_by(GroupChatSubscription.joined.asc()) 

471 ) 

472 .scalars() 

473 .all() 

474 ) 

475 for sub in subs: 

476 out += f"{format_user(sub.user)} joined at {sub.joined} (left at {sub.left}), role={sub.role}\n" 

477 out += "\n\nMessages:\n" 

478 out += format_conversation(group_chat.conversation_id) 

479 out += "\n\n\n\n" 

480 return out 

481 

482 def format_all_chats_for_user(user_id): 

483 out = "" 

484 user = session.execute(select(User).where(User.id == user_id)).scalar_one() 

485 out += f"Chats for user {format_user(user)}\n" 

486 host_request_ids = ( 

487 session.execute( 

488 select(HostRequest.conversation_id) 

489 .where(or_(HostRequest.host_user_id == user_id, HostRequest.surfer_user_id == user_id)) 

490 .order_by(HostRequest.conversation_id.desc()) 

491 ) 

492 .scalars() 

493 .all() 

494 ) 

495 out += f"************************************* Requests ({len(host_request_ids)})\n" 

496 for host_request in host_request_ids: 

497 out += format_host_request(host_request) 

498 group_chat_ids = ( 

499 session.execute( 

500 select(GroupChatSubscription.group_chat_id) 

501 .where(GroupChatSubscription.user_id == user_id) 

502 .order_by(GroupChatSubscription.joined.desc()) 

503 ) 

504 .scalars() 

505 .all() 

506 ) 

507 out += f"************************************* Group chats ({len(group_chat_ids)})\n" 

508 for group_chat_id in group_chat_ids: 

509 out += format_group_chat(group_chat_id) 

510 return out 

511 

512 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

513 if not user: 

514 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

515 

516 return admin_pb2.GetChatsRes(response=format_all_chats_for_user(user.id)) 

517 

518 def ListEventCommunityInviteRequests(self, request, context, session): 

519 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

520 next_request_id = int(request.page_token) if request.page_token else 0 

521 requests = ( 

522 session.execute( 

523 select(EventCommunityInviteRequest) 

524 .where(EventCommunityInviteRequest.approved.is_(None)) 

525 .where(EventCommunityInviteRequest.id >= next_request_id) 

526 .order_by(EventCommunityInviteRequest.id) 

527 .limit(page_size + 1) 

528 ) 

529 .scalars() 

530 .all() 

531 ) 

532 

533 def _request_to_pb(request): 

534 users_to_notify, node_id = get_users_to_notify_for_new_event(session, request.occurrence) 

535 return admin_pb2.EventCommunityInviteRequest( 

536 event_community_invite_request_id=request.id, 

537 user_id=request.user_id, 

538 event_url=urls.event_link(occurrence_id=request.occurrence.id, slug=request.occurrence.event.slug), 

539 approx_users_to_notify=len(users_to_notify), 

540 community_id=node_id, 

541 ) 

542 

543 return admin_pb2.ListEventCommunityInviteRequestsRes( 

544 requests=[_request_to_pb(request) for request in requests[:page_size]], 

545 next_page_token=str(requests[-1].id) if len(requests) > page_size else None, 

546 ) 

547 

548 def DecideEventCommunityInviteRequest(self, request, context, session): 

549 req = session.execute( 

550 select(EventCommunityInviteRequest).where( 

551 EventCommunityInviteRequest.id == request.event_community_invite_request_id 

552 ) 

553 ).scalar_one_or_none() 

554 

555 if not req: 

556 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_COMMUNITY_INVITE_NOT_FOUND) 

557 

558 if req.decided: 

559 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.EVENT_COMMUNITY_INVITE_ALREADY_DECIDED) 

560 

561 decided = now() 

562 req.decided = decided 

563 req.decided_by_user_id = context.user_id 

564 req.approved = request.approve 

565 

566 # deny other reqs for the same event 

567 if request.approve: 

568 session.execute( 

569 update(EventCommunityInviteRequest) 

570 .where(EventCommunityInviteRequest.occurrence_id == req.occurrence_id) 

571 .where(EventCommunityInviteRequest.decided.is_(None)) 

572 .values(decided=decided, decided_by_user_id=context.user_id, approved=False) 

573 ) 

574 

575 session.flush() 

576 

577 if request.approve: 

578 queue_job( 

579 session, 

580 "generate_event_create_notifications", 

581 payload=jobs_pb2.GenerateEventCreateNotificationsPayload( 

582 inviting_user_id=req.user_id, 

583 occurrence_id=req.occurrence_id, 

584 approved=True, 

585 ), 

586 ) 

587 

588 return admin_pb2.DecideEventCommunityInviteRequestRes() 

589 

590 def DeleteEvent(self, request, context, session): 

591 res = session.execute( 

592 select(Event, EventOccurrence) 

593 .where(EventOccurrence.id == request.event_id) 

594 .where(EventOccurrence.event_id == Event.id) 

595 .where(~EventOccurrence.is_deleted) 

596 ).one_or_none() 

597 

598 if not res: 

599 context.abort(grpc.StatusCode.NOT_FOUND, errors.EVENT_NOT_FOUND) 

600 

601 event, occurrence = res 

602 

603 occurrence.is_deleted = True 

604 

605 queue_job( 

606 session, 

607 "generate_event_delete_notifications", 

608 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

609 occurrence_id=occurrence.id, 

610 ), 

611 ) 

612 

613 return empty_pb2.Empty() 

614 

615 def ListUserIds(self, request, context, session): 

616 start_date = request.start_time.ToDatetime() 

617 end_date = request.end_time.ToDatetime() 

618 

619 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

620 next_user_id = int(request.page_token) if request.page_token else 0 

621 

622 user_ids = ( 

623 session.execute( 

624 select(User.id) 

625 .where(or_(User.id <= next_user_id, next_user_id == 0)) 

626 .where(User.joined >= start_date) 

627 .where(User.joined <= end_date) 

628 .order_by(User.id.desc()) 

629 .limit(page_size + 1) 

630 ) 

631 .scalars() 

632 .all() 

633 ) 

634 

635 return admin_pb2.ListUserIdsRes( 

636 user_ids=user_ids[:page_size], 

637 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

638 ) 

639 

640 def EditReferenceText(self, request, context, session): 

641 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

642 

643 if reference is None: 

644 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

645 

646 if not request.new_text.strip(): 

647 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.REFERENCE_NO_TEXT) 

648 

649 reference.text = request.new_text.strip() 

650 return empty_pb2.Empty() 

651 

652 def DeleteReference(self, request, context, session): 

653 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

654 

655 if reference is None: 

656 context.abort(grpc.StatusCode.NOT_FOUND, errors.REFERENCE_NOT_FOUND) 

657 

658 reference.is_deleted = True 

659 return empty_pb2.Empty() 

660 

661 def EditDiscussion(self, request, context, session): 

662 discussion = session.execute( 

663 select(Discussion).where(Discussion.id == request.discussion_id) 

664 ).scalar_one_or_none() 

665 if not discussion: 

666 context.abort(grpc.StatusCode.NOT_FOUND, errors.DISCUSSION_NOT_FOUND) 

667 if request.new_title: 

668 discussion.title = request.new_title.strip() 

669 if request.new_content: 

670 discussion.content = request.new_content.strip() 

671 return empty_pb2.Empty() 

672 

673 def EditReply(self, request, context, session): 

674 database_id, depth = unpack_thread_id(request.reply_id) 

675 if depth == 1: 

676 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none() 

677 elif depth == 2: 

678 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

679 if not obj: 

680 context.abort(grpc.StatusCode.NOT_FOUND, errors.OBJECT_NOT_FOUND) 

681 obj.content = request.new_content.strip() 

682 return empty_pb2.Empty() 

683 

684 def AddUsersToModerationUserList(self, request, context, session): 

685 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created. 

686 Id of the moderation list is returned.""" 

687 req_users = request.users 

688 users = [] 

689 

690 for req_user in req_users: 

691 user = session.execute(select(User).where_username_or_email_or_id(req_user)).scalar_one_or_none() 

692 if not user: 

693 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

694 users.append(user) 

695 

696 # Create a new moderation user list if no one is provided 

697 if not request.moderation_list_id: 

698 moderation_user_list = ModerationUserList() 

699 session.add(moderation_user_list) 

700 session.flush() 

701 else: 

702 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

703 if not moderation_user_list: 

704 context.abort(grpc.StatusCode.NOT_FOUND, errors.MODERATION_USER_LIST_NOT_FOUND) 

705 

706 # Add users to the moderation list only if not already in it 

707 for user in users: 

708 if user not in moderation_user_list.users: 

709 moderation_user_list.users.append(user) 

710 

711 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id) 

712 

713 def ListModerationUserLists(self, request, context, session): 

714 """Lists all moderation user lists for a user.""" 

715 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

716 if not user: 

717 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

718 

719 moderation_lists = [ 

720 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users]) 

721 for ml in user.moderation_user_lists 

722 ] 

723 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists) 

724 

725 def RemoveUserFromModerationUserList(self, request, context, session): 

726 """Removes a user from a provided moderation user list.""" 

727 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

728 if not user: 

729 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

730 if not request.moderation_list_id: 

731 context.abort(grpc.StatusCode.INVALID_ARGUMENT, errors.MISSING_MODERATION_USER_LIST_ID) 

732 

733 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

734 if not moderation_user_list: 

735 context.abort(grpc.StatusCode.NOT_FOUND, errors.MODERATION_USER_LIST_NOT_FOUND) 

736 if user not in moderation_user_list.users: 

737 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.USER_NOT_IN_THE_MODERATION_USER_LIST) 

738 

739 moderation_user_list.users.remove(user) 

740 

741 if len(moderation_user_list.users) == 0: 

742 session.delete(moderation_user_list) 

743 

744 return empty_pb2.Empty() 

745 

746 def CreateAccountDeletionLink(self, request, context, session): 

747 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

748 if not user: 

749 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

750 expiry_days = request.expiry_days or 7 

751 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

752 session.add(token) 

753 return admin_pb2.CreateAccountDeletionLinkRes( 

754 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

755 ) 

756 

757 def AccessStats(self, request, context, session): 

758 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

759 if not user: 

760 context.abort(grpc.StatusCode.NOT_FOUND, errors.USER_NOT_FOUND) 

761 

762 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90) 

763 end_time = to_aware_datetime(request.end_time) if request.end_time else now() 

764 

765 user_activity = session.execute( 

766 select( 

767 UserActivity.ip_address, 

768 UserActivity.user_agent, 

769 func.sum(UserActivity.api_calls), 

770 func.count(UserActivity.period), 

771 func.min(UserActivity.period), 

772 func.max(UserActivity.period), 

773 ) 

774 .where(UserActivity.user_id == user.id) 

775 .where(UserActivity.period >= start_time) 

776 .where(UserActivity.period >= end_time) 

777 .order_by(func.max(UserActivity.period).desc()) 

778 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

779 ).all() 

780 

781 out = admin_pb2.AccessStatsRes() 

782 

783 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

784 user_agent_data = user_agents_parse(user_agent or "") 

785 asn = geoip_asn(ip_address) 

786 out.stats.append( 

787 admin_pb2.AccessStat( 

788 ip_address=ip_address, 

789 asn=str(asn[0]) if asn else None, 

790 asorg=str(asn[1]) if asn else None, 

791 asnetwork=str(asn[2]) if asn else None, 

792 user_agent=user_agent, 

793 operating_system=user_agent_data.os.family, 

794 browser=user_agent_data.browser.family, 

795 device=user_agent_data.device.family, 

796 approximate_location=geoip_approximate_location(ip_address) or "Unknown", 

797 api_call_count=api_call_count, 

798 periods_count=periods_count, 

799 first_seen=Timestamp_from_datetime(first_seen), 

800 last_seen=Timestamp_from_datetime(last_seen), 

801 ) 

802 ) 

803 

804 return out 

805 

806 def SendBlogPostNotification(self, request, context, session): 

807 if len(request.title) > 50: 

808 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_TITLE_TOO_LONG) 

809 if len(request.blurb) > 100: 

810 context.abort(grpc.StatusCode.FAILED_PRECONDITION, errors.ADMIN_BLOG_BLURB_TOO_LONG) 

811 queue_job( 

812 session, 

813 "generate_new_blog_post_notifications", 

814 payload=jobs_pb2.GenerateNewBlogPostNotificationsPayload( 

815 url=request.url, 

816 title=request.title, 

817 blurb=request.blurb, 

818 ), 

819 ) 

820 return empty_pb2.Empty()