Coverage for src/couchers/servicers/admin.py: 69%

347 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-08 11:38 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy.sql import and_, func, or_, select 

7from user_agents import parse as user_agents_parse 

8 

9from couchers import urls 

10from couchers.crypto import urlsafe_secure_token 

11from couchers.helpers.badges import user_add_badge, user_remove_badge 

12from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

13from couchers.helpers.strong_verification import get_strong_verification_fields 

14from couchers.jobs.enqueue import queue_job 

15from couchers.models import ( 

16 AccountDeletionToken, 

17 Comment, 

18 ContentReport, 

19 Discussion, 

20 Event, 

21 EventOccurrence, 

22 GroupChat, 

23 GroupChatSubscription, 

24 HostRequest, 

25 LanguageAbility, 

26 Message, 

27 ModerationUserList, 

28 ModNote, 

29 Reference, 

30 Reply, 

31 User, 

32 UserActivity, 

33 UserBadge, 

34) 

35from couchers.notifications.notify import notify 

36from couchers.proto import admin_pb2, admin_pb2_grpc, notification_data_pb2 

37from couchers.proto.internal import jobs_pb2 

38from couchers.resources import get_badge_dict 

39from couchers.servicers.api import user_model_to_pb 

40from couchers.servicers.auth import create_session 

41from couchers.servicers.threads import unpack_thread_id 

42from couchers.sql import couchers_select as select 

43from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime 

44 

45logger = logging.getLogger(__name__) 

46 

47MAX_PAGINATION_LENGTH = 250 

48 

49 

50def _user_to_details(session, user): 

51 return admin_pb2.UserDetails( 

52 user_id=user.id, 

53 username=user.username, 

54 name=user.name, 

55 email=user.email, 

56 gender=user.gender, 

57 birthdate=date_to_api(user.birthdate), 

58 banned=user.is_banned, 

59 deleted=user.is_deleted, 

60 do_not_email=user.do_not_email, 

61 badges=[badge.badge_id for badge in user.badges], 

62 **get_strong_verification_fields(session, user), 

63 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

64 admin_note=user.admin_note, 

65 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

66 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

67 ) 

68 

69 

70def _content_report_to_pb(content_report: ContentReport): 

71 return admin_pb2.ContentReport( 

72 content_report_id=content_report.id, 

73 time=Timestamp_from_datetime(content_report.time), 

74 reporting_user_id=content_report.reporting_user_id, 

75 author_user_id=content_report.author_user_id, 

76 reason=content_report.reason, 

77 description=content_report.description, 

78 content_ref=content_report.content_ref, 

79 user_agent=content_report.user_agent, 

80 page=content_report.page, 

81 ) 

82 

83 

84def append_admin_note(session, context, user, note): 

85 if not note.strip(): 

86 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty") 

87 admin = session.execute(select(User).where(User.id == context.user_id)).scalar_one() 

88 user.admin_note += f"\n[{now().isoformat()}] (id: {admin.id}, username: {admin.username}) {note}\n" 

89 

90 

91class Admin(admin_pb2_grpc.AdminServicer): 

92 def GetUserDetails(self, request, context, session): 

93 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

94 if not user: 

95 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

96 return _user_to_details(session, user) 

97 

98 def GetUser(self, request, context, session): 

99 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

100 if not user: 

101 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

102 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True) 

103 

104 def SearchUsers(self, request, context, session): 

105 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

106 next_user_id = int(request.page_token) if request.page_token else 0 

107 statement = select(User) 

108 if request.username: 

109 statement = statement.where(User.username.ilike(request.username)) 

110 if request.email: 

111 statement = statement.where(User.email.ilike(request.email)) 

112 if request.name: 

113 statement = statement.where(User.name.ilike(request.name)) 

114 if request.admin_note: 

115 statement = statement.where(User.admin_note.ilike(request.admin_note)) 

116 if request.city: 

117 statement = statement.where(User.city.ilike(request.city)) 

118 if request.min_user_id: 

119 statement = statement.where(User.id >= request.min_user_id) 

120 if request.max_user_id: 

121 statement = statement.where(User.id <= request.max_user_id) 

122 if request.min_birthdate: 

123 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

124 if request.max_birthdate: 

125 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

126 if request.genders: 

127 statement = statement.where(User.gender.in_(request.genders)) 

128 if request.min_joined_date: 

129 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

130 if request.max_joined_date: 

131 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

132 if request.min_last_active_date: 

133 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

134 if request.max_last_active_date: 

135 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

136 if request.genders: 

137 statement = statement.where(User.gender.in_(request.genders)) 

138 if request.language_codes: 

139 statement = statement.join( 

140 LanguageAbility, 

141 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

142 ) 

143 if request.HasField("is_deleted"): 

144 statement = statement.where(User.is_deleted == request.is_deleted.value) 

145 if request.HasField("is_banned"): 

146 statement = statement.where(User.is_banned == request.is_banned.value) 

147 if request.HasField("has_avatar"): 

148 if request.has_avatar.value: 

149 statement = statement.where(User.avatar_key != None) 

150 else: 

151 statement = statement.where(User.avatar_key == None) 

152 users = ( 

153 session.execute(statement.where(User.id >= next_user_id).order_by(User.id).limit(page_size + 1)) 

154 .scalars() 

155 .all() 

156 ) 

157 logger.info(users) 

158 return admin_pb2.SearchUsersRes( 

159 users=[_user_to_details(session, user) for user in users[:page_size]], 

160 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

161 ) 

162 

163 def ChangeUserGender(self, request, context, session): 

164 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

165 if not user: 

166 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

167 user.gender = request.gender 

168 session.commit() 

169 

170 notify( 

171 session, 

172 user_id=user.id, 

173 topic_action="gender:change", 

174 data=notification_data_pb2.GenderChange( 

175 gender=request.gender, 

176 ), 

177 ) 

178 

179 return _user_to_details(session, user) 

180 

181 def ChangeUserBirthdate(self, request, context, session): 

182 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

183 if not user: 

184 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

185 user.birthdate = parse_date(request.birthdate) 

186 session.commit() 

187 

188 notify( 

189 session, 

190 user_id=user.id, 

191 topic_action="birthdate:change", 

192 data=notification_data_pb2.BirthdateChange( 

193 birthdate=request.birthdate, 

194 ), 

195 ) 

196 

197 return _user_to_details(session, user) 

198 

199 def AddBadge(self, request, context, session): 

200 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

201 if not user: 

202 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

203 

204 badge = get_badge_dict().get(request.badge_id) 

205 if not badge: 

206 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

207 

208 if not badge["admin_editable"]: 

209 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

210 

211 if badge["id"] in [b.badge_id for b in user.badges]: 

212 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge") 

213 

214 user_add_badge(session, user.id, request.badge_id) 

215 

216 return _user_to_details(session, user) 

217 

218 def RemoveBadge(self, request, context, session): 

219 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

220 if not user: 

221 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

222 

223 badge = get_badge_dict().get(request.badge_id) 

224 if not badge: 

225 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

226 

227 if not badge["admin_editable"]: 

228 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

229 

230 user_badge = session.execute( 

231 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge["id"]) 

232 ).scalar_one_or_none() 

233 if not user_badge: 

234 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge") 

235 

236 user_remove_badge(session, user.id, request.badge_id) 

237 

238 return _user_to_details(session, user) 

239 

240 def SetPassportSexGenderException(self, request, context, session): 

241 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

242 if not user: 

243 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

244 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

245 return _user_to_details(session, user) 

246 

247 def BanUser(self, request, context, session): 

248 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

249 if not user: 

250 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

251 append_admin_note(session, context, user, request.admin_note) 

252 user.is_banned = True 

253 return _user_to_details(session, user) 

254 

255 def UnbanUser(self, request, context, session): 

256 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

257 if not user: 

258 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

259 append_admin_note(session, context, user, request.admin_note) 

260 user.is_banned = False 

261 return _user_to_details(session, user) 

262 

263 def AddAdminNote(self, request, context, session): 

264 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

265 if not user: 

266 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

267 append_admin_note(session, context, user, request.admin_note) 

268 return _user_to_details(session, user) 

269 

270 def GetContentReport(self, request, context, session): 

271 content_report = session.execute( 

272 select(ContentReport).where(ContentReport.id == request.content_report_id) 

273 ).scalar_one_or_none() 

274 if not content_report: 

275 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found") 

276 return admin_pb2.GetContentReportRes( 

277 content_report=_content_report_to_pb(content_report), 

278 ) 

279 

280 def GetContentReportsForAuthor(self, request, context, session): 

281 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

282 if not user: 

283 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

284 content_reports = ( 

285 session.execute( 

286 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

287 ) 

288 .scalars() 

289 .all() 

290 ) 

291 return admin_pb2.GetContentReportsForAuthorRes( 

292 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

293 ) 

294 

295 def SendModNote(self, request, context, session): 

296 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

297 if not user: 

298 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

299 session.add( 

300 ModNote( 

301 user_id=user.id, 

302 internal_id=request.internal_id, 

303 creator_user_id=context.user_id, 

304 note_content=request.content, 

305 ) 

306 ) 

307 session.flush() 

308 

309 if not request.do_not_notify: 

310 notify( 

311 session, 

312 user_id=user.id, 

313 topic_action="modnote:create", 

314 ) 

315 

316 return _user_to_details(session, user) 

317 

318 def MarkUserNeedsLocationUpdate(self, request, context, session): 

319 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

320 if not user: 

321 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

322 user.needs_to_update_location = True 

323 return _user_to_details(session, user) 

324 

325 def DeleteUser(self, request, context, session): 

326 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

327 if not user: 

328 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

329 user.is_deleted = True 

330 return _user_to_details(session, user) 

331 

332 def RecoverDeletedUser(self, request, context, session): 

333 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

334 if not user: 

335 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

336 user.is_deleted = False 

337 return _user_to_details(session, user) 

338 

339 def CreateApiKey(self, request, context, session): 

340 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

341 if not user: 

342 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

343 token, expiry = create_session( 

344 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

345 ) 

346 

347 notify( 

348 session, 

349 user_id=user.id, 

350 topic_action="api_key:create", 

351 data=notification_data_pb2.ApiKeyCreate( 

352 api_key=token, 

353 expiry=Timestamp_from_datetime(expiry), 

354 ), 

355 ) 

356 

357 return _user_to_details(session, user) 

358 

359 def GetChats(self, request, context, session): 

360 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

361 if not user: 

362 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

363 

364 # Cache for UserDetails to avoid recomputing for the same user 

365 user_details_cache = {} 

366 

367 def get_user_details(user_id): 

368 if user_id not in user_details_cache: 

369 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

370 user_details_cache[user_id] = _user_to_details(session, u) 

371 return user_details_cache[user_id] 

372 

373 def message_to_pb(message): 

374 return admin_pb2.ChatMessage( 

375 message_id=message.id, 

376 author=get_user_details(message.author_id), 

377 time=Timestamp_from_datetime(message.time), 

378 message_type=message.message_type.name if message.message_type else "", 

379 text=message.text or "", 

380 host_request_status_target=( 

381 message.host_request_status_target.name if message.host_request_status_target else "" 

382 ), 

383 target=get_user_details(message.target_id) if message.target_id else None, 

384 ) 

385 

386 def get_messages_for_conversation(conversation_id): 

387 messages = ( 

388 session.execute( 

389 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

390 ) 

391 .scalars() 

392 .all() 

393 ) 

394 return [message_to_pb(msg) for msg in messages] 

395 

396 def get_host_request_pb(host_request): 

397 return admin_pb2.AdminHostRequest( 

398 host_request_id=host_request.conversation_id, 

399 surfer=get_user_details(host_request.surfer_user_id), 

400 host=get_user_details(host_request.host_user_id), 

401 status=host_request.status.name if host_request.status else "", 

402 from_date=date_to_api(host_request.from_date), 

403 to_date=date_to_api(host_request.to_date), 

404 created=Timestamp_from_datetime(host_request.conversation.created), 

405 messages=get_messages_for_conversation(host_request.conversation_id), 

406 ) 

407 

408 def get_group_chat_pb(group_chat): 

409 subs = ( 

410 session.execute( 

411 select(GroupChatSubscription) 

412 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

413 .order_by(GroupChatSubscription.joined.asc()) 

414 ) 

415 .scalars() 

416 .all() 

417 ) 

418 members = [ 

419 admin_pb2.GroupChatMember( 

420 user=get_user_details(sub.user_id), 

421 joined=Timestamp_from_datetime(sub.joined), 

422 left=Timestamp_from_datetime(sub.left) if sub.left else None, 

423 role=sub.role.name if sub.role else "", 

424 ) 

425 for sub in subs 

426 ] 

427 return admin_pb2.AdminGroupChat( 

428 group_chat_id=group_chat.conversation_id, 

429 title=group_chat.title or "", 

430 is_dm=group_chat.is_dm, 

431 creator=get_user_details(group_chat.creator_id), 

432 members=members, 

433 messages=get_messages_for_conversation(group_chat.conversation_id), 

434 ) 

435 

436 # Get all host requests for the user 

437 host_requests = ( 

438 session.execute( 

439 select(HostRequest) 

440 .where(or_(HostRequest.host_user_id == user.id, HostRequest.surfer_user_id == user.id)) 

441 .order_by(HostRequest.conversation_id.desc()) 

442 ) 

443 .scalars() 

444 .all() 

445 ) 

446 

447 # Get all group chats for the user 

448 group_chat_ids = ( 

449 session.execute( 

450 select(GroupChatSubscription.group_chat_id) 

451 .where(GroupChatSubscription.user_id == user.id) 

452 .order_by(GroupChatSubscription.joined.desc()) 

453 ) 

454 .scalars() 

455 .all() 

456 ) 

457 group_chats = ( 

458 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all() 

459 ) 

460 

461 return admin_pb2.GetChatsRes( 

462 user=get_user_details(user.id), 

463 host_requests=[get_host_request_pb(hr) for hr in host_requests], 

464 group_chats=[get_group_chat_pb(gc) for gc in group_chats], 

465 ) 

466 

467 def DeleteEvent(self, request, context, session): 

468 res = session.execute( 

469 select(Event, EventOccurrence) 

470 .where(EventOccurrence.id == request.event_id) 

471 .where(EventOccurrence.event_id == Event.id) 

472 .where(~EventOccurrence.is_deleted) 

473 ).one_or_none() 

474 

475 if not res: 

476 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

477 

478 event, occurrence = res 

479 

480 occurrence.is_deleted = True 

481 

482 queue_job( 

483 session, 

484 "generate_event_delete_notifications", 

485 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

486 occurrence_id=occurrence.id, 

487 ), 

488 ) 

489 

490 return empty_pb2.Empty() 

491 

492 def ListUserIds(self, request, context, session): 

493 start_date = to_aware_datetime(request.start_time) 

494 end_date = to_aware_datetime(request.end_time) 

495 

496 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

497 next_user_id = int(request.page_token) if request.page_token else 0 

498 

499 user_ids = ( 

500 session.execute( 

501 select(User.id) 

502 .where(or_(User.id <= next_user_id, next_user_id == 0)) 

503 .where(User.joined >= start_date) 

504 .where(User.joined <= end_date) 

505 .order_by(User.id.desc()) 

506 .limit(page_size + 1) 

507 ) 

508 .scalars() 

509 .all() 

510 ) 

511 

512 return admin_pb2.ListUserIdsRes( 

513 user_ids=user_ids[:page_size], 

514 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

515 ) 

516 

517 def EditReferenceText(self, request, context, session): 

518 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

519 

520 if reference is None: 

521 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

522 

523 if not request.new_text.strip(): 

524 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text") 

525 

526 reference.text = request.new_text.strip() 

527 return empty_pb2.Empty() 

528 

529 def DeleteReference(self, request, context, session): 

530 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

531 

532 if reference is None: 

533 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

534 

535 reference.is_deleted = True 

536 return empty_pb2.Empty() 

537 

538 def EditDiscussion(self, request, context, session): 

539 discussion = session.execute( 

540 select(Discussion).where(Discussion.id == request.discussion_id) 

541 ).scalar_one_or_none() 

542 if not discussion: 

543 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found") 

544 if request.new_title: 

545 discussion.title = request.new_title.strip() 

546 if request.new_content: 

547 discussion.content = request.new_content.strip() 

548 return empty_pb2.Empty() 

549 

550 def EditReply(self, request, context, session): 

551 database_id, depth = unpack_thread_id(request.reply_id) 

552 if depth == 1: 

553 obj = session.execute(select(Comment).where(Comment.id == database_id)).scalar_one_or_none() 

554 elif depth == 2: 

555 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

556 if not obj: 

557 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found") 

558 obj.content = request.new_content.strip() 

559 return empty_pb2.Empty() 

560 

561 def AddUsersToModerationUserList(self, request, context, session): 

562 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created. 

563 Id of the moderation list is returned.""" 

564 req_users = request.users 

565 users = [] 

566 

567 for req_user in req_users: 

568 user = session.execute(select(User).where_username_or_email_or_id(req_user)).scalar_one_or_none() 

569 if not user: 

570 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

571 users.append(user) 

572 

573 # Create a new moderation user list if no one is provided 

574 if not request.moderation_list_id: 

575 moderation_user_list = ModerationUserList() 

576 session.add(moderation_user_list) 

577 session.flush() 

578 else: 

579 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

580 if not moderation_user_list: 

581 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

582 

583 # Add users to the moderation list only if not already in it 

584 for user in users: 

585 if user not in moderation_user_list.users: 

586 moderation_user_list.users.append(user) 

587 

588 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id) 

589 

590 def ListModerationUserLists(self, request, context, session): 

591 """Lists all moderation user lists for a user.""" 

592 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

593 if not user: 

594 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

595 

596 moderation_lists = [ 

597 admin_pb2.ModerationList(moderation_list_id=ml.id, member_ids=[u.id for u in ml.users]) 

598 for ml in user.moderation_user_lists 

599 ] 

600 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists) 

601 

602 def RemoveUserFromModerationUserList(self, request, context, session): 

603 """Removes a user from a provided moderation user list.""" 

604 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

605 if not user: 

606 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

607 if not request.moderation_list_id: 

608 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id") 

609 

610 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

611 if not moderation_user_list: 

612 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

613 if user not in moderation_user_list.users: 

614 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list") 

615 

616 moderation_user_list.users.remove(user) 

617 

618 if len(moderation_user_list.users) == 0: 

619 session.delete(moderation_user_list) 

620 

621 return empty_pb2.Empty() 

622 

623 def CreateAccountDeletionLink(self, request, context, session): 

624 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

625 if not user: 

626 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

627 expiry_days = request.expiry_days or 7 

628 token = AccountDeletionToken(token=urlsafe_secure_token(), user=user, expiry=now() + timedelta(hours=2)) 

629 session.add(token) 

630 return admin_pb2.CreateAccountDeletionLinkRes( 

631 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

632 ) 

633 

634 def AccessStats(self, request, context, session): 

635 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

636 if not user: 

637 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

638 

639 start_time = to_aware_datetime(request.start_time) if request.start_time else now() - timedelta(days=90) 

640 end_time = to_aware_datetime(request.end_time) if request.end_time else now() 

641 

642 user_activity = session.execute( 

643 select( 

644 UserActivity.ip_address, 

645 UserActivity.user_agent, 

646 func.sum(UserActivity.api_calls), 

647 func.count(UserActivity.period), 

648 func.min(UserActivity.period), 

649 func.max(UserActivity.period), 

650 ) 

651 .where(UserActivity.user_id == user.id) 

652 .where(UserActivity.period >= start_time) 

653 .where(UserActivity.period >= end_time) 

654 .order_by(func.max(UserActivity.period).desc()) 

655 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

656 ).all() 

657 

658 out = admin_pb2.AccessStatsRes() 

659 

660 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

661 user_agent_data = user_agents_parse(user_agent or "") 

662 asn = geoip_asn(ip_address) 

663 out.stats.append( 

664 admin_pb2.AccessStat( 

665 ip_address=ip_address, 

666 asn=str(asn[0]) if asn else None, 

667 asorg=str(asn[1]) if asn else None, 

668 asnetwork=str(asn[2]) if asn else None, 

669 user_agent=user_agent, 

670 operating_system=user_agent_data.os.family, 

671 browser=user_agent_data.browser.family, 

672 device=user_agent_data.device.family, 

673 approximate_location=geoip_approximate_location(ip_address) or "Unknown", 

674 api_call_count=api_call_count, 

675 periods_count=periods_count, 

676 first_seen=Timestamp_from_datetime(first_seen), 

677 last_seen=Timestamp_from_datetime(last_seen), 

678 ) 

679 ) 

680 

681 return out 

682 

683 def SetLastDonated(self, request, context, session): 

684 user = session.execute(select(User).where_username_or_email_or_id(request.user)).scalar_one_or_none() 

685 if not user: 

686 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

687 

688 if request.HasField("last_donated"): 

689 user.last_donated = to_aware_datetime(request.last_donated) 

690 else: 

691 user.last_donated = None 

692 

693 return _user_to_details(session, user)