Coverage for app / backend / src / couchers / servicers / admin.py: 77%
471 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 09:44 +0000
1import logging
2from datetime import timedelta
4import grpc
5from google.protobuf import empty_pb2
6from sqlalchemy import select
7from sqlalchemy.orm import Session, selectinload
8from sqlalchemy.sql import and_, func, or_
9from user_agents import parse as user_agents_parse
11from couchers import urls
12from couchers.context import CouchersContext
13from couchers.crypto import urlsafe_secure_token
14from couchers.helpers.badges import user_add_badge, user_remove_badge
15from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
16from couchers.helpers.strong_verification import get_strong_verification_fields
17from couchers.jobs.enqueue import queue_job
18from couchers.models import (
19 AccountDeletionToken,
20 AdminAction,
21 AdminActionLevel,
22 AdminTag,
23 Comment,
24 ContentReport,
25 Discussion,
26 Event,
27 EventOccurrence,
28 FriendRelationship,
29 GroupChat,
30 GroupChatSubscription,
31 HostRequest,
32 LanguageAbility,
33 Message,
34 ModerationUserList,
35 ModNote,
36 Reference,
37 Reply,
38 User,
39 UserActivity,
40 UserAdminTag,
41 UserBadge,
42)
43from couchers.models.notifications import NotificationTopicAction
44from couchers.models.uploads import has_avatar_photo_expression
45from couchers.notifications.notify import notify
46from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2
47from couchers.proto.internal import jobs_pb2
48from couchers.resources import get_badge_dict
49from couchers.servicers.api import user_model_to_pb
50from couchers.servicers.auth import create_session
51from couchers.servicers.events import generate_event_delete_notifications
52from couchers.servicers.threads import unpack_thread_id
53from couchers.sql import to_bool, username_or_email_or_id
54from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime
56logger = logging.getLogger(__name__)
58MAX_PAGINATION_LENGTH = 250
61adminactionlevel2api = {
62 AdminActionLevel.debug: admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
63 AdminActionLevel.normal: admin_pb2.ADMIN_ACTION_LEVEL_NORMAL,
64 AdminActionLevel.high: admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
65}
67api2adminactionlevel = {
68 admin_pb2.ADMIN_ACTION_LEVEL_DEBUG: AdminActionLevel.debug,
69 admin_pb2.ADMIN_ACTION_LEVEL_NORMAL: AdminActionLevel.normal,
70 admin_pb2.ADMIN_ACTION_LEVEL_HIGH: AdminActionLevel.high,
71}
74def log_admin_action(
75 session: Session,
76 context: CouchersContext,
77 target_user: User,
78 action_type: str,
79 note: str | None = None,
80 tag: str | None = None,
81 level: AdminActionLevel = AdminActionLevel.normal,
82) -> AdminAction:
83 action = AdminAction(
84 admin_user_id=context.user_id,
85 target_user_id=target_user.id,
86 action_type=action_type,
87 level=level,
88 note=note,
89 tag=tag,
90 )
91 session.add(action)
92 session.flush()
93 return action
96def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails:
97 # Query admin actions for this user
98 actions = session.execute(
99 select(AdminAction, User.username)
100 .join(User, AdminAction.admin_user_id == User.id)
101 .where(AdminAction.target_user_id == user.id)
102 .order_by(AdminAction.created.asc())
103 ).all()
105 action_pbs = []
106 for action, admin_username in actions:
107 action_pbs.append(
108 admin_pb2.AdminActionLog(
109 admin_action_id=action.id,
110 created=Timestamp_from_datetime(action.created),
111 admin_user_id=action.admin_user_id,
112 admin_username=admin_username,
113 action_type=action.action_type,
114 level=adminactionlevel2api[action.level],
115 note=action.note or "",
116 tag=action.tag or "",
117 )
118 )
120 # Query admin tags
121 admin_tags = (
122 session.execute(
123 select(AdminTag.tag)
124 .join(UserAdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
125 .where(UserAdminTag.user_id == user.id)
126 .order_by(AdminTag.tag)
127 )
128 .scalars()
129 .all()
130 )
132 return admin_pb2.UserDetails(
133 user_id=user.id,
134 username=user.username,
135 name=user.name,
136 email=user.email,
137 gender=user.gender,
138 birthdate=date_to_api(user.birthdate),
139 banned=user.banned_at is not None,
140 deleted=user.deleted_at is not None,
141 do_not_email=user.do_not_email,
142 badges=[badge.badge_id for badge in user.badges],
143 **get_strong_verification_fields(session, user),
144 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
145 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
146 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
147 admin_actions=action_pbs,
148 admin_tags=list(admin_tags),
149 mod_score=user.mod_score,
150 )
153def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport:
154 return admin_pb2.ContentReport(
155 content_report_id=content_report.id,
156 time=Timestamp_from_datetime(content_report.time),
157 reporting_user_id=content_report.reporting_user_id,
158 author_user_id=content_report.author_user_id,
159 reason=content_report.reason,
160 description=content_report.description,
161 content_ref=content_report.content_ref,
162 user_agent=content_report.user_agent,
163 page=content_report.page,
164 )
167def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference:
168 return admin_pb2.AdminReference(
169 reference_id=reference.id,
170 from_user_id=reference.from_user_id,
171 to_user_id=reference.to_user_id,
172 reference_type=reference.reference_type.name,
173 text=reference.text,
174 private_text=reference.private_text or "",
175 time=Timestamp_from_datetime(reference.time),
176 host_request_id=reference.host_request_id or 0,
177 rating=reference.rating,
178 was_appropriate=reference.was_appropriate,
179 is_deleted=reference.is_deleted,
180 )
183class Admin(admin_pb2_grpc.AdminServicer):
184 def GetUserDetails(
185 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session
186 ) -> admin_pb2.UserDetails:
187 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
188 if not user: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
190 return _user_to_details(session, user)
192 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User:
193 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
194 if not user: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true
195 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
196 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True)
198 def SearchUsers(
199 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session
200 ) -> admin_pb2.SearchUsersRes:
201 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
202 next_user_id = int(request.page_token) if request.page_token else 0
203 statement = select(User)
204 if request.username: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 statement = statement.where(User.username.ilike(request.username))
206 if request.email: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 statement = statement.where(User.email.ilike(request.email))
208 if request.name: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 statement = statement.where(User.name.ilike(request.name))
210 if request.admin_action_log:
211 statement = statement.where(
212 User.id.in_(select(AdminAction.target_user_id).where(AdminAction.note.ilike(request.admin_action_log)))
213 )
214 if request.city: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 statement = statement.where(User.city.ilike(request.city))
216 if request.min_user_id: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 statement = statement.where(User.id >= request.min_user_id)
218 if request.max_user_id: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 statement = statement.where(User.id <= request.max_user_id)
220 if request.min_birthdate: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
222 if request.max_birthdate: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
224 if request.genders: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 statement = statement.where(User.gender.in_(request.genders))
226 if request.min_joined_date: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
228 if request.max_joined_date: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
230 if request.min_last_active_date: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
232 if request.max_last_active_date: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
234 if request.genders: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 statement = statement.where(User.gender.in_(request.genders))
236 if request.language_codes: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 statement = statement.join(
238 LanguageAbility,
239 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
240 )
241 if request.HasField("is_deleted"): 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 statement = statement.where((User.deleted_at != None) == request.is_deleted.value)
243 if request.HasField("is_banned"): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 statement = statement.where((User.banned_at != None) == request.is_banned.value)
245 if request.HasField("has_avatar"): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true
246 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value)
247 if request.admin_tags:
248 for tag_name in request.admin_tags:
249 statement = statement.where(
250 User.id.in_(
251 select(UserAdminTag.user_id)
252 .join(AdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
253 .where(AdminTag.tag == tag_name)
254 )
255 )
256 users = (
257 session.execute(
258 statement.where(User.id >= next_user_id)
259 .order_by(User.id)
260 .limit(page_size + 1)
261 .options(selectinload(User.badges))
262 )
263 .scalars()
264 .all()
265 )
266 logger.info(users)
267 return admin_pb2.SearchUsersRes(
268 users=[_user_to_details(session, user) for user in users[:page_size]],
269 next_page_token=str(users[-1].id) if len(users) > page_size else None,
270 )
272 def ChangeUserGender(
273 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session
274 ) -> admin_pb2.UserDetails:
275 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
276 if not user: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
278 old_gender = user.gender
279 user.gender = request.gender
280 log_admin_action(
281 session, context, user, "change_gender", note=f"Changed from '{old_gender}' to '{request.gender}'"
282 )
283 session.commit()
285 notify(
286 session,
287 user_id=user.id,
288 topic_action=NotificationTopicAction.gender__change,
289 key="",
290 data=notification_data_pb2.GenderChange(
291 gender=request.gender,
292 ),
293 )
295 return _user_to_details(session, user)
297 def ChangeUserBirthdate(
298 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session
299 ) -> admin_pb2.UserDetails:
300 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
301 if not user: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
303 if not (birthdate := parse_date(request.birthdate)): 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate")
306 old_birthdate = user.birthdate
307 user.birthdate = birthdate
308 log_admin_action(
309 session, context, user, "change_birthdate", note=f"Changed from {old_birthdate} to {request.birthdate}"
310 )
311 session.commit()
313 notify(
314 session,
315 user_id=user.id,
316 topic_action=NotificationTopicAction.birthdate__change,
317 key="",
318 data=notification_data_pb2.BirthdateChange(
319 birthdate=request.birthdate,
320 ),
321 )
323 return _user_to_details(session, user)
325 def AddBadge(
326 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session
327 ) -> admin_pb2.UserDetails:
328 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
329 if not user: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
332 badge = get_badge_dict().get(request.badge_id)
333 if not badge:
334 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
336 if not badge.admin_editable:
337 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
339 if badge.id in [b.badge_id for b in user.badges]:
340 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_badge")
342 user_add_badge(session, user.id, request.badge_id)
343 log_admin_action(session, context, user, "add_badge", note=f"Added badge {request.badge_id}")
345 return _user_to_details(session, user)
347 def RemoveBadge(
348 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session
349 ) -> admin_pb2.UserDetails:
350 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
351 if not user: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
354 badge = get_badge_dict().get(request.badge_id)
355 if not badge: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
358 if not badge.admin_editable: 358 ↛ 359line 358 didn't jump to line 359 because the condition on line 358 was never true
359 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin_cannot_edit_badge")
361 user_badge = session.execute(
362 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id)
363 ).scalar_one_or_none()
364 if not user_badge:
365 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_badge")
367 user_remove_badge(session, user.id, request.badge_id)
368 log_admin_action(session, context, user, "remove_badge", note=f"Removed badge {request.badge_id}")
370 return _user_to_details(session, user)
372 def SetPassportSexGenderException(
373 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session
374 ) -> admin_pb2.UserDetails:
375 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
376 if not user: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
378 old_exception = user.has_passport_sex_gender_exception
379 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
380 log_admin_action(
381 session,
382 context,
383 user,
384 "set_passport_sex_gender_exception",
385 note=f"Changed from {old_exception} to {request.passport_sex_gender_exception}",
386 )
387 return _user_to_details(session, user)
389 def BanUser(
390 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session
391 ) -> admin_pb2.UserDetails:
392 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
393 if not user: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
395 if not request.admin_note.strip(): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
397 log_admin_action(session, context, user, "ban", note=request.admin_note, level=AdminActionLevel.high)
398 user.banned_at = now()
399 return _user_to_details(session, user)
401 def UnbanUser(
402 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session
403 ) -> admin_pb2.UserDetails:
404 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
405 if not user: 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true
406 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
407 if not request.admin_note.strip(): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
409 log_admin_action(session, context, user, "unban", note=request.admin_note, level=AdminActionLevel.high)
410 user.banned_at = None
411 return _user_to_details(session, user)
413 def AddAdminNote(
414 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session
415 ) -> admin_pb2.UserDetails:
416 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
417 if not user: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
419 if not request.admin_note.strip():
420 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_note_cant_be_empty")
421 level = api2adminactionlevel.get(request.level, AdminActionLevel.normal)
422 log_admin_action(session, context, user, "note", note=request.admin_note, level=level)
423 return _user_to_details(session, user)
425 def GetContentReport(
426 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session
427 ) -> admin_pb2.GetContentReportRes:
428 content_report = session.execute(
429 select(ContentReport).where(ContentReport.id == request.content_report_id)
430 ).scalar_one_or_none()
431 if not content_report:
432 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "content_report_not_found")
433 return admin_pb2.GetContentReportRes(
434 content_report=_content_report_to_pb(content_report),
435 )
437 def GetContentReportsForAuthor(
438 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session
439 ) -> admin_pb2.GetContentReportsForAuthorRes:
440 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
441 if not user: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true
442 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
443 content_reports = (
444 session.execute(
445 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
446 )
447 .scalars()
448 .all()
449 )
450 return admin_pb2.GetContentReportsForAuthorRes(
451 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
452 )
454 def SendModNote(
455 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session
456 ) -> admin_pb2.UserDetails:
457 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
458 if not user: 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
460 session.add(
461 ModNote(
462 user_id=user.id,
463 internal_id=request.internal_id,
464 creator_user_id=context.user_id,
465 note_content=request.content,
466 )
467 )
468 session.flush()
469 notify_user = "No" if request.do_not_notify else "Yes"
470 log_admin_action(
471 session,
472 context,
473 user,
474 "send_mod_note",
475 note=f"Notify user: {notify_user}\n\n{request.content}",
476 )
478 if not request.do_not_notify:
479 notify(
480 session,
481 user_id=user.id,
482 topic_action=NotificationTopicAction.modnote__create,
483 key="",
484 )
486 return _user_to_details(session, user)
488 def MarkUserNeedsLocationUpdate(
489 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session
490 ) -> admin_pb2.UserDetails:
491 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
492 if not user: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
494 user.needs_to_update_location = True
495 log_admin_action(
496 session, context, user, "mark_needs_location_update", note="Marked user as needing location update"
497 )
498 return _user_to_details(session, user)
500 def DeleteUser(
501 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session
502 ) -> admin_pb2.UserDetails:
503 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
504 if not user: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
506 user.deleted_at = now()
507 log_admin_action(session, context, user, "delete_user", level=AdminActionLevel.high)
508 return _user_to_details(session, user)
510 def RecoverDeletedUser(
511 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session
512 ) -> admin_pb2.UserDetails:
513 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
514 if not user: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
516 user.deleted_at = None
517 user.undelete_token = None
518 user.undelete_until = None
519 log_admin_action(session, context, user, "recover_user", level=AdminActionLevel.high)
520 return _user_to_details(session, user)
522 def CreateApiKey(
523 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session
524 ) -> admin_pb2.CreateApiKeyRes:
525 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
526 if not user: 526 ↛ 527line 526 didn't jump to line 527 because the condition on line 526 was never true
527 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
528 token, expiry = create_session(
529 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
530 )
531 log_admin_action(session, context, user, "create_api_key")
533 notify(
534 session,
535 user_id=user.id,
536 topic_action=NotificationTopicAction.api_key__create,
537 key="",
538 data=notification_data_pb2.ApiKeyCreate(
539 api_key=token,
540 expiry=Timestamp_from_datetime(expiry),
541 ),
542 )
544 return admin_pb2.CreateApiKeyRes()
546 def GetChats(
547 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session
548 ) -> admin_pb2.GetChatsRes:
549 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
550 if not user: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true
551 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
553 # Cache for ChatUserInfo to avoid recomputing for the same user
554 user_info_cache = {}
556 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo:
557 if user_id not in user_info_cache: 557 ↛ 566line 557 didn't jump to line 566 because the condition on line 557 was always true
558 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
559 user_info_cache[user_id] = admin_pb2.ChatUserInfo(
560 user_id=u.id,
561 username=u.username,
562 name=u.name,
563 birthdate=date_to_api(u.birthdate),
564 gender=u.gender,
565 )
566 return user_info_cache[user_id]
568 def message_to_pb(message: Message) -> admin_pb2.ChatMessage:
569 return admin_pb2.ChatMessage(
570 message_id=message.id,
571 author=get_chat_user_info(message.author_id),
572 time=Timestamp_from_datetime(message.time),
573 message_type=message.message_type.name if message.message_type else "",
574 text=message.text or "",
575 host_request_status_target=(
576 message.host_request_status_target.name if message.host_request_status_target else ""
577 ),
578 target=get_chat_user_info(message.target_id) if message.target_id else None,
579 )
581 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]:
582 messages = (
583 session.execute(
584 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
585 )
586 .scalars()
587 .all()
588 )
589 return [message_to_pb(msg) for msg in messages]
591 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest:
592 return admin_pb2.AdminHostRequest(
593 host_request_id=host_request.conversation_id,
594 surfer=get_chat_user_info(host_request.initiator_user_id),
595 host=get_chat_user_info(host_request.recipient_user_id),
596 status=host_request.status.name if host_request.status else "",
597 from_date=date_to_api(host_request.from_date),
598 to_date=date_to_api(host_request.to_date),
599 created=Timestamp_from_datetime(host_request.conversation.created),
600 messages=get_messages_for_conversation(host_request.conversation_id),
601 )
603 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat:
604 subs = (
605 session.execute(
606 select(GroupChatSubscription)
607 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
608 .order_by(GroupChatSubscription.joined.asc())
609 )
610 .scalars()
611 .all()
612 )
613 members = [
614 admin_pb2.GroupChatMember(
615 user=get_chat_user_info(sub.user_id),
616 joined=Timestamp_from_datetime(sub.joined),
617 left=Timestamp_from_datetime(sub.left) if sub.left else None,
618 role=sub.role.name if sub.role else "",
619 )
620 for sub in subs
621 ]
622 return admin_pb2.AdminGroupChat(
623 group_chat_id=group_chat.conversation_id,
624 title=group_chat.title or "",
625 is_dm=group_chat.is_dm,
626 creator=get_chat_user_info(group_chat.creator_id),
627 members=members,
628 messages=get_messages_for_conversation(group_chat.conversation_id),
629 )
631 # Get all host requests for the user
632 host_requests = (
633 session.execute(
634 select(HostRequest)
635 .where(or_(HostRequest.recipient_user_id == user.id, HostRequest.initiator_user_id == user.id))
636 .order_by(HostRequest.conversation_id.desc())
637 )
638 .scalars()
639 .all()
640 )
642 # Get all group chats for the user
643 group_chat_ids = (
644 session.execute(
645 select(GroupChatSubscription.group_chat_id)
646 .where(GroupChatSubscription.user_id == user.id)
647 .order_by(GroupChatSubscription.joined.desc())
648 )
649 .scalars()
650 .all()
651 )
652 group_chats = (
653 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all()
654 )
656 # Build protobuf objects, then sort by latest message time (most recent first)
657 host_request_pbs = [get_host_request_pb(hr) for hr in host_requests]
658 host_request_pbs.sort(key=lambda hr: hr.messages[-1].time.seconds if hr.messages else 0, reverse=True)
660 group_chat_pbs = [get_group_chat_pb(gc) for gc in group_chats]
661 group_chat_pbs.sort(key=lambda gc: gc.messages[-1].time.seconds if gc.messages else 0, reverse=True)
663 return admin_pb2.GetChatsRes(
664 user=get_chat_user_info(user.id),
665 host_requests=host_request_pbs,
666 group_chats=group_chat_pbs,
667 )
669 def DeleteEvent(
670 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session
671 ) -> empty_pb2.Empty:
672 res = session.execute(
673 select(Event, EventOccurrence)
674 .where(EventOccurrence.id == request.event_id)
675 .where(EventOccurrence.event_id == Event.id)
676 .where(~EventOccurrence.is_deleted)
677 ).one_or_none()
679 if not res: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true
680 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
682 event, occurrence = res
684 occurrence.is_deleted = True
686 queue_job(
687 session,
688 job=generate_event_delete_notifications,
689 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
690 occurrence_id=occurrence.id,
691 ),
692 )
694 return empty_pb2.Empty()
696 def ListUserIds(
697 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session
698 ) -> admin_pb2.ListUserIdsRes:
699 start_date = to_aware_datetime(request.start_time)
700 end_date = to_aware_datetime(request.end_time)
702 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
703 next_user_id = int(request.page_token) if request.page_token else 0
705 user_ids = (
706 session.execute(
707 select(User.id)
708 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0)))
709 .where(User.joined >= start_date)
710 .where(User.joined <= end_date)
711 .order_by(User.id.desc())
712 .limit(page_size + 1)
713 )
714 .scalars()
715 .all()
716 )
718 return admin_pb2.ListUserIdsRes(
719 user_ids=user_ids[:page_size],
720 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
721 )
723 def EditReferenceText(
724 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session
725 ) -> empty_pb2.Empty:
726 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
728 if reference is None: 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true
729 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
731 if not request.new_text.strip(): 731 ↛ 732line 731 didn't jump to line 732 because the condition on line 731 was never true
732 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text")
734 reference.text = request.new_text.strip()
735 # Log action against the reference author
736 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one()
737 log_admin_action(session, context, author, "edit_reference", note=f"Edited reference {reference.id}")
738 return empty_pb2.Empty()
740 def DeleteReference(
741 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session
742 ) -> empty_pb2.Empty:
743 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
745 if reference is None: 745 ↛ 746line 745 didn't jump to line 746 because the condition on line 745 was never true
746 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "reference_not_found")
748 reference.is_deleted = True
749 # Log action against the reference author
750 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one()
751 log_admin_action(session, context, author, "delete_reference", note=f"Deleted reference {reference.id}")
752 return empty_pb2.Empty()
754 def GetUserReferences(
755 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session
756 ) -> admin_pb2.GetUserReferencesRes:
757 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
758 if not user:
759 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
761 references_from = (
762 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc()))
763 .scalars()
764 .all()
765 )
767 references_to = (
768 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc()))
769 .scalars()
770 .all()
771 )
773 return admin_pb2.GetUserReferencesRes(
774 references_from=[_reference_to_pb(ref) for ref in references_from],
775 references_to=[_reference_to_pb(ref) for ref in references_to],
776 )
778 def GetFriendRequests(
779 self, request: admin_pb2.GetFriendRequestsReq, context: CouchersContext, session: Session
780 ) -> admin_pb2.GetFriendRequestsRes:
781 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
782 if not user:
783 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
785 user_info_cache: dict[int, admin_pb2.ChatUserInfo] = {}
787 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo:
788 if user_id not in user_info_cache:
789 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
790 user_info_cache[user_id] = admin_pb2.ChatUserInfo(
791 user_id=u.id,
792 username=u.username,
793 name=u.name,
794 birthdate=date_to_api(u.birthdate),
795 gender=u.gender,
796 )
797 return user_info_cache[user_id]
799 def friend_request_to_pb(rel: FriendRelationship) -> admin_pb2.AdminFriendRequest:
800 return admin_pb2.AdminFriendRequest(
801 friend_request_id=rel.id,
802 from_user=get_chat_user_info(rel.from_user_id),
803 to_user=get_chat_user_info(rel.to_user_id),
804 status=rel.status.name if rel.status else "",
805 time_sent=Timestamp_from_datetime(rel.time_sent),
806 time_responded=Timestamp_from_datetime(rel.time_responded) if rel.time_responded else None,
807 moderation_visibility=rel.moderation_state.visibility.name,
808 )
810 sent = (
811 session.execute(
812 select(FriendRelationship)
813 .where(FriendRelationship.from_user_id == user.id)
814 .order_by(FriendRelationship.id.desc())
815 )
816 .scalars()
817 .all()
818 )
820 received = (
821 session.execute(
822 select(FriendRelationship)
823 .where(FriendRelationship.to_user_id == user.id)
824 .order_by(FriendRelationship.id.desc())
825 )
826 .scalars()
827 .all()
828 )
830 return admin_pb2.GetFriendRequestsRes(
831 sent=[friend_request_to_pb(rel) for rel in sent],
832 received=[friend_request_to_pb(rel) for rel in received],
833 )
835 def EditDiscussion(
836 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session
837 ) -> empty_pb2.Empty:
838 discussion = session.execute(
839 select(Discussion).where(Discussion.id == request.discussion_id)
840 ).scalar_one_or_none()
841 if not discussion:
842 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
843 if request.new_title:
844 discussion.title = request.new_title.strip()
845 if request.new_content:
846 discussion.content = request.new_content.strip()
847 return empty_pb2.Empty()
849 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty:
850 database_id, depth = unpack_thread_id(request.reply_id)
851 if depth == 1:
852 obj: Comment | Reply | None = session.execute(
853 select(Comment).where(Comment.id == database_id)
854 ).scalar_one_or_none()
855 elif depth == 2:
856 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
857 else:
858 obj = None
860 if not obj:
861 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "object_not_found")
862 obj.content = request.new_content.strip()
863 return empty_pb2.Empty()
865 def AddUsersToModerationUserList(
866 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session
867 ) -> admin_pb2.AddUsersToModerationUserListRes:
868 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created.
869 Id of the moderation list is returned."""
870 req_users = request.users
871 users = []
873 for req_user in req_users:
874 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none()
875 if not user:
876 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
877 users.append(user)
879 if request.moderation_list_id:
880 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
881 if not moderation_user_list:
882 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
883 # Create a new moderation user list if no one is provided
884 else:
885 moderation_user_list = ModerationUserList()
886 session.add(moderation_user_list)
887 session.flush()
889 # Add users to the moderation list only if not already in it
890 for user in users:
891 if user not in moderation_user_list.users: 891 ↛ 893line 891 didn't jump to line 893 because the condition on line 891 was always true
892 moderation_user_list.users.append(user)
893 log_admin_action(session, context, user, "add_to_moderation_list")
895 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id)
897 def ListModerationUserLists(
898 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session
899 ) -> admin_pb2.ListModerationUserListsRes:
900 """Lists all moderation user lists for a user."""
901 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
902 if not user: 902 ↛ 903line 902 didn't jump to line 903 because the condition on line 902 was never true
903 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
905 moderation_lists = [
906 admin_pb2.ModerationList(
907 moderation_list_id=ml.id,
908 members=[_user_to_details(session, u) for u in ml.users],
909 )
910 for ml in user.moderation_user_lists
911 ]
912 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists)
914 def RemoveUserFromModerationUserList(
915 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session
916 ) -> empty_pb2.Empty:
917 """Removes a user from a provided moderation user list."""
918 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
919 if not user:
920 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
921 if not request.moderation_list_id:
922 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "missing_moderation_user_list_id")
924 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
925 if not moderation_user_list: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_user_list_not_found")
927 if user not in moderation_user_list.users:
928 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_not_in_the_moderation_user_list")
930 moderation_user_list.users.remove(user)
931 log_admin_action(session, context, user, "remove_from_moderation_list")
933 if len(moderation_user_list.users) == 0:
934 session.delete(moderation_user_list)
936 return empty_pb2.Empty()
938 def CreateAccountDeletionLink(
939 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session
940 ) -> admin_pb2.CreateAccountDeletionLinkRes:
941 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
942 if not user: 942 ↛ 943line 942 didn't jump to line 943 because the condition on line 942 was never true
943 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
944 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2))
945 session.add(token)
946 log_admin_action(session, context, user, "create_account_deletion_link", level=AdminActionLevel.high)
947 return admin_pb2.CreateAccountDeletionLinkRes(
948 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
949 )
951 def AccessStats(
952 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session
953 ) -> admin_pb2.AccessStatsRes:
954 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
955 if not user: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
958 start_time = (
959 to_aware_datetime(request.start_time) if request.HasField("start_time") else now() - timedelta(days=90)
960 )
961 end_time = to_aware_datetime(request.end_time) if request.HasField("end_time") else now()
963 user_activity = session.execute(
964 select(
965 UserActivity.ip_address,
966 UserActivity.user_agent,
967 func.sum(UserActivity.api_calls),
968 func.count(UserActivity.period),
969 func.min(UserActivity.period),
970 func.max(UserActivity.period),
971 )
972 .where(UserActivity.user_id == user.id)
973 .where(UserActivity.period >= start_time)
974 .where(UserActivity.period <= end_time)
975 .order_by(func.max(UserActivity.period).desc())
976 .group_by(UserActivity.ip_address, UserActivity.user_agent)
977 ).all()
979 out = admin_pb2.AccessStatsRes()
981 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
982 ip_address_str = str(ip_address) if ip_address is not None else None
983 user_agent_data = user_agents_parse(user_agent or "")
984 asn = geoip_asn(ip_address_str)
985 out.stats.append(
986 admin_pb2.AccessStat(
987 ip_address=ip_address_str,
988 asn=str(asn[0]) if asn else None,
989 asorg=str(asn[1]) if asn else None,
990 asnetwork=str(asn[2]) if asn else None,
991 user_agent=user_agent,
992 operating_system=user_agent_data.os.family,
993 browser=user_agent_data.browser.family,
994 device=user_agent_data.device.family,
995 approximate_location=geoip_approximate_location(ip_address_str) or "Unknown",
996 api_call_count=api_call_count,
997 periods_count=periods_count,
998 first_seen=Timestamp_from_datetime(first_seen),
999 last_seen=Timestamp_from_datetime(last_seen),
1000 )
1001 )
1003 return out
1005 def SetLastDonated(
1006 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session
1007 ) -> admin_pb2.UserDetails:
1008 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1009 if not user:
1010 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1012 if request.HasField("last_donated"):
1013 user.last_donated = to_aware_datetime(request.last_donated)
1014 else:
1015 user.last_donated = None
1017 log_admin_action(session, context, user, "set_last_donated")
1018 return _user_to_details(session, user)
1020 def CreateAdminTag(
1021 self, request: admin_pb2.CreateAdminTagReq, context: CouchersContext, session: Session
1022 ) -> admin_pb2.AdminTagInfo:
1023 if not request.tag.strip():
1024 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin_tag_cant_be_empty")
1025 existing = session.execute(select(AdminTag).where(AdminTag.tag == request.tag.strip())).scalar_one_or_none()
1026 if existing:
1027 context.abort_with_error_code(grpc.StatusCode.ALREADY_EXISTS, "admin_tag_already_exists")
1028 admin_tag = AdminTag(tag=request.tag.strip())
1029 session.add(admin_tag)
1030 session.flush()
1031 return admin_pb2.AdminTagInfo(admin_tag_id=admin_tag.id, tag=admin_tag.tag)
1033 def ListAdminTags(
1034 self, request: admin_pb2.ListAdminTagsReq, context: CouchersContext, session: Session
1035 ) -> admin_pb2.ListAdminTagsRes:
1036 tags = session.execute(select(AdminTag).order_by(AdminTag.tag)).scalars().all()
1037 return admin_pb2.ListAdminTagsRes(
1038 tags=[admin_pb2.AdminTagInfo(admin_tag_id=tag.id, tag=tag.tag) for tag in tags]
1039 )
1041 def AddAdminTagToUser(
1042 self, request: admin_pb2.AddAdminTagToUserReq, context: CouchersContext, session: Session
1043 ) -> admin_pb2.UserDetails:
1044 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1045 if not user: 1045 ↛ 1046line 1045 didn't jump to line 1046 because the condition on line 1045 was never true
1046 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1047 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
1048 if not admin_tag:
1049 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found")
1050 existing = session.execute(
1051 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
1052 ).scalar_one_or_none()
1053 if existing:
1054 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_already_has_admin_tag")
1055 session.add(UserAdminTag(user_id=user.id, admin_tag_id=admin_tag.id))
1056 session.flush()
1057 log_admin_action(session, context, user, "add_tag", tag=request.tag)
1058 return _user_to_details(session, user)
1060 def RemoveAdminTagFromUser(
1061 self, request: admin_pb2.RemoveAdminTagFromUserReq, context: CouchersContext, session: Session
1062 ) -> admin_pb2.UserDetails:
1063 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1064 if not user: 1064 ↛ 1065line 1064 didn't jump to line 1065 because the condition on line 1064 was never true
1065 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1066 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
1067 if not admin_tag: 1067 ↛ 1068line 1067 didn't jump to line 1068 because the condition on line 1067 was never true
1068 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin_tag_not_found")
1069 user_admin_tag = session.execute(
1070 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
1071 ).scalar_one_or_none()
1072 if not user_admin_tag:
1073 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "user_does_not_have_admin_tag")
1074 session.delete(user_admin_tag)
1075 session.flush()
1076 log_admin_action(session, context, user, "remove_tag", tag=request.tag)
1077 return _user_to_details(session, user)
1079 def SetModScore(
1080 self, request: admin_pb2.SetModScoreReq, context: CouchersContext, session: Session
1081 ) -> admin_pb2.UserDetails:
1082 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1083 if not user:
1084 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1085 user.mod_score = request.mod_score
1086 log_admin_action(session, context, user, "set_mod_score", note=f"mod_score={request.mod_score}")
1087 return _user_to_details(session, user)