Coverage for app / backend / src / couchers / servicers / admin.py: 77%

471 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2from datetime import timedelta 

3 

4import grpc 

5from google.protobuf import empty_pb2 

6from sqlalchemy import select 

7from sqlalchemy.orm import Session, selectinload 

8from sqlalchemy.sql import and_, func, or_ 

9from user_agents import parse as user_agents_parse 

10 

11from couchers import urls 

12from couchers.context import CouchersContext 

13from couchers.crypto import urlsafe_secure_token 

14from couchers.helpers.badges import user_add_badge, user_remove_badge 

15from couchers.helpers.geoip import geoip_approximate_location, geoip_asn 

16from couchers.helpers.strong_verification import get_strong_verification_fields 

17from couchers.jobs.enqueue import queue_job 

18from couchers.models import ( 

19 AccountDeletionToken, 

20 AdminAction, 

21 AdminActionLevel, 

22 AdminTag, 

23 Comment, 

24 ContentReport, 

25 Discussion, 

26 Event, 

27 EventOccurrence, 

28 FriendRelationship, 

29 GroupChat, 

30 GroupChatSubscription, 

31 HostRequest, 

32 LanguageAbility, 

33 Message, 

34 ModerationUserList, 

35 ModNote, 

36 Reference, 

37 Reply, 

38 User, 

39 UserActivity, 

40 UserAdminTag, 

41 UserBadge, 

42) 

43from couchers.models.notifications import NotificationTopicAction 

44from couchers.models.uploads import has_avatar_photo_expression 

45from couchers.notifications.notify import notify 

46from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2 

47from couchers.proto.internal import jobs_pb2 

48from couchers.resources import get_badge_dict 

49from couchers.servicers.api import user_model_to_pb 

50from couchers.servicers.auth import create_session 

51from couchers.servicers.events import generate_event_delete_notifications 

52from couchers.servicers.threads import unpack_thread_id 

53from couchers.sql import to_bool, username_or_email_or_id 

54from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime 

55 

56logger = logging.getLogger(__name__) 

57 

58MAX_PAGINATION_LENGTH = 250 

59 

60 

61adminactionlevel2api = { 

62 AdminActionLevel.debug: admin_pb2.ADMIN_ACTION_LEVEL_DEBUG, 

63 AdminActionLevel.normal: admin_pb2.ADMIN_ACTION_LEVEL_NORMAL, 

64 AdminActionLevel.high: admin_pb2.ADMIN_ACTION_LEVEL_HIGH, 

65} 

66 

67api2adminactionlevel = { 

68 admin_pb2.ADMIN_ACTION_LEVEL_DEBUG: AdminActionLevel.debug, 

69 admin_pb2.ADMIN_ACTION_LEVEL_NORMAL: AdminActionLevel.normal, 

70 admin_pb2.ADMIN_ACTION_LEVEL_HIGH: AdminActionLevel.high, 

71} 

72 

73 

74def log_admin_action( 

75 session: Session, 

76 context: CouchersContext, 

77 target_user: User, 

78 action_type: str, 

79 note: str | None = None, 

80 tag: str | None = None, 

81 level: AdminActionLevel = AdminActionLevel.normal, 

82) -> AdminAction: 

83 action = AdminAction( 

84 admin_user_id=context.user_id, 

85 target_user_id=target_user.id, 

86 action_type=action_type, 

87 level=level, 

88 note=note, 

89 tag=tag, 

90 ) 

91 session.add(action) 

92 session.flush() 

93 return action 

94 

95 

96def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails: 

97 # Query admin actions for this user 

98 actions = session.execute( 

99 select(AdminAction, User.username) 

100 .join(User, AdminAction.admin_user_id == User.id) 

101 .where(AdminAction.target_user_id == user.id) 

102 .order_by(AdminAction.created.asc()) 

103 ).all() 

104 

105 action_pbs = [] 

106 for action, admin_username in actions: 

107 action_pbs.append( 

108 admin_pb2.AdminActionLog( 

109 admin_action_id=action.id, 

110 created=Timestamp_from_datetime(action.created), 

111 admin_user_id=action.admin_user_id, 

112 admin_username=admin_username, 

113 action_type=action.action_type, 

114 level=adminactionlevel2api[action.level], 

115 note=action.note or "", 

116 tag=action.tag or "", 

117 ) 

118 ) 

119 

120 # Query admin tags 

121 admin_tags = ( 

122 session.execute( 

123 select(AdminTag.tag) 

124 .join(UserAdminTag, UserAdminTag.admin_tag_id == AdminTag.id) 

125 .where(UserAdminTag.user_id == user.id) 

126 .order_by(AdminTag.tag) 

127 ) 

128 .scalars() 

129 .all() 

130 ) 

131 

132 return admin_pb2.UserDetails( 

133 user_id=user.id, 

134 username=user.username, 

135 name=user.name, 

136 email=user.email, 

137 gender=user.gender, 

138 birthdate=date_to_api(user.birthdate), 

139 banned=user.banned_at is not None, 

140 deleted=user.deleted_at is not None, 

141 do_not_email=user.do_not_email, 

142 badges=[badge.badge_id for badge in user.badges], 

143 **get_strong_verification_fields(session, user), 

144 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception, 

145 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(), 

146 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(), 

147 admin_actions=action_pbs, 

148 admin_tags=list(admin_tags), 

149 mod_score=user.mod_score, 

150 ) 

151 

152 

153def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport: 

154 return admin_pb2.ContentReport( 

155 content_report_id=content_report.id, 

156 time=Timestamp_from_datetime(content_report.time), 

157 reporting_user_id=content_report.reporting_user_id, 

158 author_user_id=content_report.author_user_id, 

159 reason=content_report.reason, 

160 description=content_report.description, 

161 content_ref=content_report.content_ref, 

162 user_agent=content_report.user_agent, 

163 page=content_report.page, 

164 ) 

165 

166 

167def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference: 

168 return admin_pb2.AdminReference( 

169 reference_id=reference.id, 

170 from_user_id=reference.from_user_id, 

171 to_user_id=reference.to_user_id, 

172 reference_type=reference.reference_type.name, 

173 text=reference.text, 

174 private_text=reference.private_text or "", 

175 time=Timestamp_from_datetime(reference.time), 

176 host_request_id=reference.host_request_id or 0, 

177 rating=reference.rating, 

178 was_appropriate=reference.was_appropriate, 

179 is_deleted=reference.is_deleted, 

180 ) 

181 

182 

183class Admin(admin_pb2_grpc.AdminServicer): 

184 def GetUserDetails( 

185 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session 

186 ) -> admin_pb2.UserDetails: 

187 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

188 if not user: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

190 return _user_to_details(session, user) 

191 

192 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User: 

193 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

194 if not user: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

196 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True) 

197 

198 def SearchUsers( 

199 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session 

200 ) -> admin_pb2.SearchUsersRes: 

201 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

202 next_user_id = int(request.page_token) if request.page_token else 0 

203 statement = select(User) 

204 if request.username: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 statement = statement.where(User.username.ilike(request.username)) 

206 if request.email: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 statement = statement.where(User.email.ilike(request.email)) 

208 if request.name: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 statement = statement.where(User.name.ilike(request.name)) 

210 if request.admin_action_log: 

211 statement = statement.where( 

212 User.id.in_(select(AdminAction.target_user_id).where(AdminAction.note.ilike(request.admin_action_log))) 

213 ) 

214 if request.city: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 statement = statement.where(User.city.ilike(request.city)) 

216 if request.min_user_id: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 statement = statement.where(User.id >= request.min_user_id) 

218 if request.max_user_id: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 statement = statement.where(User.id <= request.max_user_id) 

220 if request.min_birthdate: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate)) 

222 if request.max_birthdate: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate)) 

224 if request.genders: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 statement = statement.where(User.gender.in_(request.genders)) 

226 if request.min_joined_date: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 statement = statement.where(User.joined >= parse_date(request.min_joined_date)) 

228 if request.max_joined_date: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 statement = statement.where(User.joined <= parse_date(request.max_joined_date)) 

230 if request.min_last_active_date: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date)) 

232 if request.max_last_active_date: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date)) 

234 if request.genders: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 statement = statement.where(User.gender.in_(request.genders)) 

236 if request.language_codes: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 statement = statement.join( 

238 LanguageAbility, 

239 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)), 

240 ) 

241 if request.HasField("is_deleted"): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 statement = statement.where((User.deleted_at != None) == request.is_deleted.value) 

243 if request.HasField("is_banned"): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 statement = statement.where((User.banned_at != None) == request.is_banned.value) 

245 if request.HasField("has_avatar"): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true

246 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value) 

247 if request.admin_tags: 

248 for tag_name in request.admin_tags: 

249 statement = statement.where( 

250 User.id.in_( 

251 select(UserAdminTag.user_id) 

252 .join(AdminTag, UserAdminTag.admin_tag_id == AdminTag.id) 

253 .where(AdminTag.tag == tag_name) 

254 ) 

255 ) 

256 users = ( 

257 session.execute( 

258 statement.where(User.id >= next_user_id) 

259 .order_by(User.id) 

260 .limit(page_size + 1) 

261 .options(selectinload(User.badges)) 

262 ) 

263 .scalars() 

264 .all() 

265 ) 

266 logger.info(users) 

267 return admin_pb2.SearchUsersRes( 

268 users=[_user_to_details(session, user) for user in users[:page_size]], 

269 next_page_token=str(users[-1].id) if len(users) > page_size else None, 

270 ) 

271 

272 def ChangeUserGender( 

273 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session 

274 ) -> admin_pb2.UserDetails: 

275 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

276 if not user: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

278 old_gender = user.gender 

279 user.gender = request.gender 

280 log_admin_action( 

281 session, context, user, "change_gender", note=f"Changed from '{old_gender}' to '{request.gender}'" 

282 ) 

283 session.commit() 

284 

285 notify( 

286 session, 

287 user_id=user.id, 

288 topic_action=NotificationTopicAction.gender__change, 

289 key="", 

290 data=notification_data_pb2.GenderChange( 

291 gender=request.gender, 

292 ), 

293 ) 

294 

295 return _user_to_details(session, user) 

296 

297 def ChangeUserBirthdate( 

298 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session 

299 ) -> admin_pb2.UserDetails: 

300 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

301 if not user: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

303 if not (birthdate := parse_date(request.birthdate)): 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate") 

305 

306 old_birthdate = user.birthdate 

307 user.birthdate = birthdate 

308 log_admin_action( 

309 session, context, user, "change_birthdate", note=f"Changed from {old_birthdate} to {request.birthdate}" 

310 ) 

311 session.commit() 

312 

313 notify( 

314 session, 

315 user_id=user.id, 

316 topic_action=NotificationTopicAction.birthdate__change, 

317 key="", 

318 data=notification_data_pb2.BirthdateChange( 

319 birthdate=request.birthdate, 

320 ), 

321 ) 

322 

323 return _user_to_details(session, user) 

324 

325 def AddBadge( 

326 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session 

327 ) -> admin_pb2.UserDetails: 

328 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

329 if not user: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

331 

332 badge = get_badge_dict().get(request.badge_id) 

333 if not badge: 

334 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

335 

336 if not badge.admin_editable: 

337 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

338 

339 if badge.id in [b.badge_id for b in user.badges]: 

340 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge") 

341 

342 user_add_badge(session, user.id, request.badge_id) 

343 log_admin_action(session, context, user, "add_badge", note=f"Added badge {request.badge_id}") 

344 

345 return _user_to_details(session, user) 

346 

347 def RemoveBadge( 

348 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session 

349 ) -> admin_pb2.UserDetails: 

350 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

351 if not user: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

353 

354 badge = get_badge_dict().get(request.badge_id) 

355 if not badge: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found") 

357 

358 if not badge.admin_editable: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true

359 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge") 

360 

361 user_badge = session.execute( 

362 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id) 

363 ).scalar_one_or_none() 

364 if not user_badge: 

365 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge") 

366 

367 user_remove_badge(session, user.id, request.badge_id) 

368 log_admin_action(session, context, user, "remove_badge", note=f"Removed badge {request.badge_id}") 

369 

370 return _user_to_details(session, user) 

371 

372 def SetPassportSexGenderException( 

373 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session 

374 ) -> admin_pb2.UserDetails: 

375 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

376 if not user: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

378 old_exception = user.has_passport_sex_gender_exception 

379 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception 

380 log_admin_action( 

381 session, 

382 context, 

383 user, 

384 "set_passport_sex_gender_exception", 

385 note=f"Changed from {old_exception} to {request.passport_sex_gender_exception}", 

386 ) 

387 return _user_to_details(session, user) 

388 

389 def BanUser( 

390 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session 

391 ) -> admin_pb2.UserDetails: 

392 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

393 if not user: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

395 if not request.admin_note.strip(): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true

396 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty") 

397 log_admin_action(session, context, user, "ban", note=request.admin_note, level=AdminActionLevel.high) 

398 user.banned_at = now() 

399 return _user_to_details(session, user) 

400 

401 def UnbanUser( 

402 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session 

403 ) -> admin_pb2.UserDetails: 

404 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

405 if not user: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

407 if not request.admin_note.strip(): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty") 

409 log_admin_action(session, context, user, "unban", note=request.admin_note, level=AdminActionLevel.high) 

410 user.banned_at = None 

411 return _user_to_details(session, user) 

412 

413 def AddAdminNote( 

414 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session 

415 ) -> admin_pb2.UserDetails: 

416 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

417 if not user: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

419 if not request.admin_note.strip(): 

420 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty") 

421 level = api2adminactionlevel.get(request.level, AdminActionLevel.normal) 

422 log_admin_action(session, context, user, "note", note=request.admin_note, level=level) 

423 return _user_to_details(session, user) 

424 

425 def GetContentReport( 

426 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session 

427 ) -> admin_pb2.GetContentReportRes: 

428 content_report = session.execute( 

429 select(ContentReport).where(ContentReport.id == request.content_report_id) 

430 ).scalar_one_or_none() 

431 if not content_report: 

432 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found") 

433 return admin_pb2.GetContentReportRes( 

434 content_report=_content_report_to_pb(content_report), 

435 ) 

436 

437 def GetContentReportsForAuthor( 

438 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session 

439 ) -> admin_pb2.GetContentReportsForAuthorRes: 

440 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

441 if not user: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true

442 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

443 content_reports = ( 

444 session.execute( 

445 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc()) 

446 ) 

447 .scalars() 

448 .all() 

449 ) 

450 return admin_pb2.GetContentReportsForAuthorRes( 

451 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports], 

452 ) 

453 

454 def SendModNote( 

455 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session 

456 ) -> admin_pb2.UserDetails: 

457 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

458 if not user: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

460 session.add( 

461 ModNote( 

462 user_id=user.id, 

463 internal_id=request.internal_id, 

464 creator_user_id=context.user_id, 

465 note_content=request.content, 

466 ) 

467 ) 

468 session.flush() 

469 notify_user = "No" if request.do_not_notify else "Yes" 

470 log_admin_action( 

471 session, 

472 context, 

473 user, 

474 "send_mod_note", 

475 note=f"Notify user: {notify_user}\n\n{request.content}", 

476 ) 

477 

478 if not request.do_not_notify: 

479 notify( 

480 session, 

481 user_id=user.id, 

482 topic_action=NotificationTopicAction.modnote__create, 

483 key="", 

484 ) 

485 

486 return _user_to_details(session, user) 

487 

488 def MarkUserNeedsLocationUpdate( 

489 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session 

490 ) -> admin_pb2.UserDetails: 

491 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

492 if not user: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

494 user.needs_to_update_location = True 

495 log_admin_action( 

496 session, context, user, "mark_needs_location_update", note="Marked user as needing location update" 

497 ) 

498 return _user_to_details(session, user) 

499 

500 def DeleteUser( 

501 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session 

502 ) -> admin_pb2.UserDetails: 

503 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

504 if not user: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

506 user.deleted_at = now() 

507 log_admin_action(session, context, user, "delete_user", level=AdminActionLevel.high) 

508 return _user_to_details(session, user) 

509 

510 def RecoverDeletedUser( 

511 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session 

512 ) -> admin_pb2.UserDetails: 

513 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

514 if not user: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

516 user.deleted_at = None 

517 user.undelete_token = None 

518 user.undelete_until = None 

519 log_admin_action(session, context, user, "recover_user", level=AdminActionLevel.high) 

520 return _user_to_details(session, user) 

521 

522 def CreateApiKey( 

523 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session 

524 ) -> admin_pb2.CreateApiKeyRes: 

525 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

526 if not user: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true

527 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

528 token, expiry = create_session( 

529 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False 

530 ) 

531 log_admin_action(session, context, user, "create_api_key") 

532 

533 notify( 

534 session, 

535 user_id=user.id, 

536 topic_action=NotificationTopicAction.api_key__create, 

537 key="", 

538 data=notification_data_pb2.ApiKeyCreate( 

539 api_key=token, 

540 expiry=Timestamp_from_datetime(expiry), 

541 ), 

542 ) 

543 

544 return admin_pb2.CreateApiKeyRes() 

545 

546 def GetChats( 

547 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session 

548 ) -> admin_pb2.GetChatsRes: 

549 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

550 if not user: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true

551 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

552 

553 # Cache for ChatUserInfo to avoid recomputing for the same user 

554 user_info_cache = {} 

555 

556 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo: 

557 if user_id not in user_info_cache: 557 ↛ 566line 557 didn't jump to line 566 because the condition on line 557 was always true

558 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

559 user_info_cache[user_id] = admin_pb2.ChatUserInfo( 

560 user_id=u.id, 

561 username=u.username, 

562 name=u.name, 

563 birthdate=date_to_api(u.birthdate), 

564 gender=u.gender, 

565 ) 

566 return user_info_cache[user_id] 

567 

568 def message_to_pb(message: Message) -> admin_pb2.ChatMessage: 

569 return admin_pb2.ChatMessage( 

570 message_id=message.id, 

571 author=get_chat_user_info(message.author_id), 

572 time=Timestamp_from_datetime(message.time), 

573 message_type=message.message_type.name if message.message_type else "", 

574 text=message.text or "", 

575 host_request_status_target=( 

576 message.host_request_status_target.name if message.host_request_status_target else "" 

577 ), 

578 target=get_chat_user_info(message.target_id) if message.target_id else None, 

579 ) 

580 

581 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]: 

582 messages = ( 

583 session.execute( 

584 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc()) 

585 ) 

586 .scalars() 

587 .all() 

588 ) 

589 return [message_to_pb(msg) for msg in messages] 

590 

591 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest: 

592 return admin_pb2.AdminHostRequest( 

593 host_request_id=host_request.conversation_id, 

594 surfer=get_chat_user_info(host_request.initiator_user_id), 

595 host=get_chat_user_info(host_request.recipient_user_id), 

596 status=host_request.status.name if host_request.status else "", 

597 from_date=date_to_api(host_request.from_date), 

598 to_date=date_to_api(host_request.to_date), 

599 created=Timestamp_from_datetime(host_request.conversation.created), 

600 messages=get_messages_for_conversation(host_request.conversation_id), 

601 ) 

602 

603 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat: 

604 subs = ( 

605 session.execute( 

606 select(GroupChatSubscription) 

607 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id) 

608 .order_by(GroupChatSubscription.joined.asc()) 

609 ) 

610 .scalars() 

611 .all() 

612 ) 

613 members = [ 

614 admin_pb2.GroupChatMember( 

615 user=get_chat_user_info(sub.user_id), 

616 joined=Timestamp_from_datetime(sub.joined), 

617 left=Timestamp_from_datetime(sub.left) if sub.left else None, 

618 role=sub.role.name if sub.role else "", 

619 ) 

620 for sub in subs 

621 ] 

622 return admin_pb2.AdminGroupChat( 

623 group_chat_id=group_chat.conversation_id, 

624 title=group_chat.title or "", 

625 is_dm=group_chat.is_dm, 

626 creator=get_chat_user_info(group_chat.creator_id), 

627 members=members, 

628 messages=get_messages_for_conversation(group_chat.conversation_id), 

629 ) 

630 

631 # Get all host requests for the user 

632 host_requests = ( 

633 session.execute( 

634 select(HostRequest) 

635 .where(or_(HostRequest.recipient_user_id == user.id, HostRequest.initiator_user_id == user.id)) 

636 .order_by(HostRequest.conversation_id.desc()) 

637 ) 

638 .scalars() 

639 .all() 

640 ) 

641 

642 # Get all group chats for the user 

643 group_chat_ids = ( 

644 session.execute( 

645 select(GroupChatSubscription.group_chat_id) 

646 .where(GroupChatSubscription.user_id == user.id) 

647 .order_by(GroupChatSubscription.joined.desc()) 

648 ) 

649 .scalars() 

650 .all() 

651 ) 

652 group_chats = ( 

653 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all() 

654 ) 

655 

656 # Build protobuf objects, then sort by latest message time (most recent first) 

657 host_request_pbs = [get_host_request_pb(hr) for hr in host_requests] 

658 host_request_pbs.sort(key=lambda hr: hr.messages[-1].time.seconds if hr.messages else 0, reverse=True) 

659 

660 group_chat_pbs = [get_group_chat_pb(gc) for gc in group_chats] 

661 group_chat_pbs.sort(key=lambda gc: gc.messages[-1].time.seconds if gc.messages else 0, reverse=True) 

662 

663 return admin_pb2.GetChatsRes( 

664 user=get_chat_user_info(user.id), 

665 host_requests=host_request_pbs, 

666 group_chats=group_chat_pbs, 

667 ) 

668 

669 def DeleteEvent( 

670 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session 

671 ) -> empty_pb2.Empty: 

672 res = session.execute( 

673 select(Event, EventOccurrence) 

674 .where(EventOccurrence.id == request.event_id) 

675 .where(EventOccurrence.event_id == Event.id) 

676 .where(~EventOccurrence.is_deleted) 

677 ).one_or_none() 

678 

679 if not res: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true

680 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found") 

681 

682 event, occurrence = res 

683 

684 occurrence.is_deleted = True 

685 

686 queue_job( 

687 session, 

688 job=generate_event_delete_notifications, 

689 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload( 

690 occurrence_id=occurrence.id, 

691 ), 

692 ) 

693 

694 return empty_pb2.Empty() 

695 

696 def ListUserIds( 

697 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session 

698 ) -> admin_pb2.ListUserIdsRes: 

699 start_date = to_aware_datetime(request.start_time) 

700 end_date = to_aware_datetime(request.end_time) 

701 

702 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

703 next_user_id = int(request.page_token) if request.page_token else 0 

704 

705 user_ids = ( 

706 session.execute( 

707 select(User.id) 

708 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0))) 

709 .where(User.joined >= start_date) 

710 .where(User.joined <= end_date) 

711 .order_by(User.id.desc()) 

712 .limit(page_size + 1) 

713 ) 

714 .scalars() 

715 .all() 

716 ) 

717 

718 return admin_pb2.ListUserIdsRes( 

719 user_ids=user_ids[:page_size], 

720 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None, 

721 ) 

722 

723 def EditReferenceText( 

724 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session 

725 ) -> empty_pb2.Empty: 

726 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

727 

728 if reference is None: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true

729 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

730 

731 if not request.new_text.strip(): 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true

732 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text") 

733 

734 reference.text = request.new_text.strip() 

735 # Log action against the reference author 

736 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one() 

737 log_admin_action(session, context, author, "edit_reference", note=f"Edited reference {reference.id}") 

738 return empty_pb2.Empty() 

739 

740 def DeleteReference( 

741 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session 

742 ) -> empty_pb2.Empty: 

743 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none() 

744 

745 if reference is None: 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true

746 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found") 

747 

748 reference.is_deleted = True 

749 # Log action against the reference author 

750 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one() 

751 log_admin_action(session, context, author, "delete_reference", note=f"Deleted reference {reference.id}") 

752 return empty_pb2.Empty() 

753 

754 def GetUserReferences( 

755 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session 

756 ) -> admin_pb2.GetUserReferencesRes: 

757 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

758 if not user: 

759 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

760 

761 references_from = ( 

762 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc())) 

763 .scalars() 

764 .all() 

765 ) 

766 

767 references_to = ( 

768 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc())) 

769 .scalars() 

770 .all() 

771 ) 

772 

773 return admin_pb2.GetUserReferencesRes( 

774 references_from=[_reference_to_pb(ref) for ref in references_from], 

775 references_to=[_reference_to_pb(ref) for ref in references_to], 

776 ) 

777 

778 def GetFriendRequests( 

779 self, request: admin_pb2.GetFriendRequestsReq, context: CouchersContext, session: Session 

780 ) -> admin_pb2.GetFriendRequestsRes: 

781 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

782 if not user: 

783 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

784 

785 user_info_cache: dict[int, admin_pb2.ChatUserInfo] = {} 

786 

787 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo: 

788 if user_id not in user_info_cache: 

789 u = session.execute(select(User).where(User.id == user_id)).scalar_one() 

790 user_info_cache[user_id] = admin_pb2.ChatUserInfo( 

791 user_id=u.id, 

792 username=u.username, 

793 name=u.name, 

794 birthdate=date_to_api(u.birthdate), 

795 gender=u.gender, 

796 ) 

797 return user_info_cache[user_id] 

798 

799 def friend_request_to_pb(rel: FriendRelationship) -> admin_pb2.AdminFriendRequest: 

800 return admin_pb2.AdminFriendRequest( 

801 friend_request_id=rel.id, 

802 from_user=get_chat_user_info(rel.from_user_id), 

803 to_user=get_chat_user_info(rel.to_user_id), 

804 status=rel.status.name if rel.status else "", 

805 time_sent=Timestamp_from_datetime(rel.time_sent), 

806 time_responded=Timestamp_from_datetime(rel.time_responded) if rel.time_responded else None, 

807 moderation_visibility=rel.moderation_state.visibility.name, 

808 ) 

809 

810 sent = ( 

811 session.execute( 

812 select(FriendRelationship) 

813 .where(FriendRelationship.from_user_id == user.id) 

814 .order_by(FriendRelationship.id.desc()) 

815 ) 

816 .scalars() 

817 .all() 

818 ) 

819 

820 received = ( 

821 session.execute( 

822 select(FriendRelationship) 

823 .where(FriendRelationship.to_user_id == user.id) 

824 .order_by(FriendRelationship.id.desc()) 

825 ) 

826 .scalars() 

827 .all() 

828 ) 

829 

830 return admin_pb2.GetFriendRequestsRes( 

831 sent=[friend_request_to_pb(rel) for rel in sent], 

832 received=[friend_request_to_pb(rel) for rel in received], 

833 ) 

834 

835 def EditDiscussion( 

836 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session 

837 ) -> empty_pb2.Empty: 

838 discussion = session.execute( 

839 select(Discussion).where(Discussion.id == request.discussion_id) 

840 ).scalar_one_or_none() 

841 if not discussion: 

842 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found") 

843 if request.new_title: 

844 discussion.title = request.new_title.strip() 

845 if request.new_content: 

846 discussion.content = request.new_content.strip() 

847 return empty_pb2.Empty() 

848 

849 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty: 

850 database_id, depth = unpack_thread_id(request.reply_id) 

851 if depth == 1: 

852 obj: Comment | Reply | None = session.execute( 

853 select(Comment).where(Comment.id == database_id) 

854 ).scalar_one_or_none() 

855 elif depth == 2: 

856 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none() 

857 else: 

858 obj = None 

859 

860 if not obj: 

861 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found") 

862 obj.content = request.new_content.strip() 

863 return empty_pb2.Empty() 

864 

865 def AddUsersToModerationUserList( 

866 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session 

867 ) -> admin_pb2.AddUsersToModerationUserListRes: 

868 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created. 

869 Id of the moderation list is returned.""" 

870 req_users = request.users 

871 users = [] 

872 

873 for req_user in req_users: 

874 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none() 

875 if not user: 

876 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

877 users.append(user) 

878 

879 if request.moderation_list_id: 

880 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

881 if not moderation_user_list: 

882 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

883 # Create a new moderation user list if no one is provided 

884 else: 

885 moderation_user_list = ModerationUserList() 

886 session.add(moderation_user_list) 

887 session.flush() 

888 

889 # Add users to the moderation list only if not already in it 

890 for user in users: 

891 if user not in moderation_user_list.users: 891 ↛ 893line 891 didn't jump to line 893 because the condition on line 891 was always true

892 moderation_user_list.users.append(user) 

893 log_admin_action(session, context, user, "add_to_moderation_list") 

894 

895 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id) 

896 

897 def ListModerationUserLists( 

898 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session 

899 ) -> admin_pb2.ListModerationUserListsRes: 

900 """Lists all moderation user lists for a user.""" 

901 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

902 if not user: 902 ↛ 903line 902 didn't jump to line 903 because the condition on line 902 was never true

903 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

904 

905 moderation_lists = [ 

906 admin_pb2.ModerationList( 

907 moderation_list_id=ml.id, 

908 members=[_user_to_details(session, u) for u in ml.users], 

909 ) 

910 for ml in user.moderation_user_lists 

911 ] 

912 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists) 

913 

914 def RemoveUserFromModerationUserList( 

915 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session 

916 ) -> empty_pb2.Empty: 

917 """Removes a user from a provided moderation user list.""" 

918 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

919 if not user: 

920 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

921 if not request.moderation_list_id: 

922 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id") 

923 

924 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id) 

925 if not moderation_user_list: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found") 

927 if user not in moderation_user_list.users: 

928 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list") 

929 

930 moderation_user_list.users.remove(user) 

931 log_admin_action(session, context, user, "remove_from_moderation_list") 

932 

933 if len(moderation_user_list.users) == 0: 

934 session.delete(moderation_user_list) 

935 

936 return empty_pb2.Empty() 

937 

938 def CreateAccountDeletionLink( 

939 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session 

940 ) -> admin_pb2.CreateAccountDeletionLinkRes: 

941 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

942 if not user: 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true

943 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

944 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2)) 

945 session.add(token) 

946 log_admin_action(session, context, user, "create_account_deletion_link", level=AdminActionLevel.high) 

947 return admin_pb2.CreateAccountDeletionLinkRes( 

948 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token) 

949 ) 

950 

951 def AccessStats( 

952 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session 

953 ) -> admin_pb2.AccessStatsRes: 

954 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

955 if not user: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

957 

958 start_time = ( 

959 to_aware_datetime(request.start_time) if request.HasField("start_time") else now() - timedelta(days=90) 

960 ) 

961 end_time = to_aware_datetime(request.end_time) if request.HasField("end_time") else now() 

962 

963 user_activity = session.execute( 

964 select( 

965 UserActivity.ip_address, 

966 UserActivity.user_agent, 

967 func.sum(UserActivity.api_calls), 

968 func.count(UserActivity.period), 

969 func.min(UserActivity.period), 

970 func.max(UserActivity.period), 

971 ) 

972 .where(UserActivity.user_id == user.id) 

973 .where(UserActivity.period >= start_time) 

974 .where(UserActivity.period <= end_time) 

975 .order_by(func.max(UserActivity.period).desc()) 

976 .group_by(UserActivity.ip_address, UserActivity.user_agent) 

977 ).all() 

978 

979 out = admin_pb2.AccessStatsRes() 

980 

981 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity: 

982 ip_address_str = str(ip_address) if ip_address is not None else None 

983 user_agent_data = user_agents_parse(user_agent or "") 

984 asn = geoip_asn(ip_address_str) 

985 out.stats.append( 

986 admin_pb2.AccessStat( 

987 ip_address=ip_address_str, 

988 asn=str(asn[0]) if asn else None, 

989 asorg=str(asn[1]) if asn else None, 

990 asnetwork=str(asn[2]) if asn else None, 

991 user_agent=user_agent, 

992 operating_system=user_agent_data.os.family, 

993 browser=user_agent_data.browser.family, 

994 device=user_agent_data.device.family, 

995 approximate_location=geoip_approximate_location(ip_address_str) or "Unknown", 

996 api_call_count=api_call_count, 

997 periods_count=periods_count, 

998 first_seen=Timestamp_from_datetime(first_seen), 

999 last_seen=Timestamp_from_datetime(last_seen), 

1000 ) 

1001 ) 

1002 

1003 return out 

1004 

1005 def SetLastDonated( 

1006 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session 

1007 ) -> admin_pb2.UserDetails: 

1008 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1009 if not user: 

1010 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1011 

1012 if request.HasField("last_donated"): 

1013 user.last_donated = to_aware_datetime(request.last_donated) 

1014 else: 

1015 user.last_donated = None 

1016 

1017 log_admin_action(session, context, user, "set_last_donated") 

1018 return _user_to_details(session, user) 

1019 

1020 def CreateAdminTag( 

1021 self, request: admin_pb2.CreateAdminTagReq, context: CouchersContext, session: Session 

1022 ) -> admin_pb2.AdminTagInfo: 

1023 if not request.tag.strip(): 

1024 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_tag_cant_be_empty") 

1025 existing = session.execute(select(AdminTag).where(AdminTag.tag == request.tag.strip())).scalar_one_or_none() 

1026 if existing: 

1027 context.abort_with_error_code(grpc.StatusCode.ALREADY_EXISTS, "admin_tag_already_exists") 

1028 admin_tag = AdminTag(tag=request.tag.strip()) 

1029 session.add(admin_tag) 

1030 session.flush() 

1031 return admin_pb2.AdminTagInfo(admin_tag_id=admin_tag.id, tag=admin_tag.tag) 

1032 

1033 def ListAdminTags( 

1034 self, request: admin_pb2.ListAdminTagsReq, context: CouchersContext, session: Session 

1035 ) -> admin_pb2.ListAdminTagsRes: 

1036 tags = session.execute(select(AdminTag).order_by(AdminTag.tag)).scalars().all() 

1037 return admin_pb2.ListAdminTagsRes( 

1038 tags=[admin_pb2.AdminTagInfo(admin_tag_id=tag.id, tag=tag.tag) for tag in tags] 

1039 ) 

1040 

1041 def AddAdminTagToUser( 

1042 self, request: admin_pb2.AddAdminTagToUserReq, context: CouchersContext, session: Session 

1043 ) -> admin_pb2.UserDetails: 

1044 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1045 if not user: 1045 ↛ 1046line 1045 didn't jump to line 1046 because the condition on line 1045 was never true

1046 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1047 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none() 

1048 if not admin_tag: 

1049 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found") 

1050 existing = session.execute( 

1051 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id) 

1052 ).scalar_one_or_none() 

1053 if existing: 

1054 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_admin_tag") 

1055 session.add(UserAdminTag(user_id=user.id, admin_tag_id=admin_tag.id)) 

1056 session.flush() 

1057 log_admin_action(session, context, user, "add_tag", tag=request.tag) 

1058 return _user_to_details(session, user) 

1059 

1060 def RemoveAdminTagFromUser( 

1061 self, request: admin_pb2.RemoveAdminTagFromUserReq, context: CouchersContext, session: Session 

1062 ) -> admin_pb2.UserDetails: 

1063 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1064 if not user: 1064 ↛ 1065line 1064 didn't jump to line 1065 because the condition on line 1064 was never true

1065 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1066 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none() 

1067 if not admin_tag: 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true

1068 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found") 

1069 user_admin_tag = session.execute( 

1070 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id) 

1071 ).scalar_one_or_none() 

1072 if not user_admin_tag: 

1073 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_admin_tag") 

1074 session.delete(user_admin_tag) 

1075 session.flush() 

1076 log_admin_action(session, context, user, "remove_tag", tag=request.tag) 

1077 return _user_to_details(session, user) 

1078 

1079 def SetModScore( 

1080 self, request: admin_pb2.SetModScoreReq, context: CouchersContext, session: Session 

1081 ) -> admin_pb2.UserDetails: 

1082 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none() 

1083 if not user: 

1084 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

1085 user.mod_score = request.mod_score 

1086 log_admin_action(session, context, user, "set_mod_score", note=f"mod_score={request.mod_score}") 

1087 return _user_to_details(session, user)