Coverage for app/backend/src/couchers/servicers/admin.py: 79%
629 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import json
2import logging
3from datetime import UTC, datetime, timedelta
4from typing import Any
6import grpc
7from google.protobuf import empty_pb2
8from google.protobuf.wrappers_pb2 import Int64Value
9from sqlalchemy import select, tuple_
10from sqlalchemy.orm import Session, aliased, selectinload
11from sqlalchemy.sql import and_, func, or_
12from user_agents import parse as user_agents_parse
14from couchers import urls
15from couchers.context import CouchersContext
16from couchers.crypto import urlsafe_secure_token
17from couchers.helpers.badges import user_add_badge, user_remove_badge
18from couchers.helpers.geoip import geoip_approximate_location, geoip_asn
19from couchers.helpers.strong_verification import get_strong_verification_fields
20from couchers.helpers.upload_uses import UploadUseType, get_upload_uses_for_keys
21from couchers.jobs.enqueue import queue_job
22from couchers.models import (
23 AccountDeletionToken,
24 AdminAction,
25 AdminActionLevel,
26 AdminTag,
27 Comment,
28 ContentReport,
29 Discussion,
30 Event,
31 EventOccurrence,
32 FriendRelationship,
33 GroupChat,
34 GroupChatSubscription,
35 HostRequest,
36 LanguageAbility,
37 Message,
38 ModerationUserList,
39 ModerationVisibility,
40 ModNote,
41 NonvisibleUserAccess,
42 NonvisibleUserAccessType,
43 NonvisibleUserState,
44 OTAPackage,
45 OTAPlatform,
46 Reference,
47 Reply,
48 User,
49 UserActivity,
50 UserAdminTag,
51 UserBadge,
52)
53from couchers.models.discussions import (
54 CommentVersion,
55 ContentChangeType,
56 DiscussionVersion,
57 ReplyVersion,
58)
59from couchers.models.notifications import NotificationTopicAction
60from couchers.models.uploads import Upload, has_avatar_photo_expression
61from couchers.notifications.notify import notify
62from couchers.proto import admin_pb2, admin_pb2_grpc, api_pb2, notification_data_pb2
63from couchers.proto.internal import jobs_pb2
64from couchers.resources import get_badge_dict
65from couchers.servicers.api import user_model_to_pb
66from couchers.servicers.auth import create_session
67from couchers.servicers.bugs import _fetch_signed_manifest, _native_ota_manifest_url
68from couchers.servicers.events import generate_event_delete_notifications
69from couchers.servicers.moderation import bulk_set_user_content_visibility
70from couchers.servicers.threads import unpack_thread_id
71from couchers.sql import to_bool, username_or_email_or_id
72from couchers.utils import Timestamp_from_datetime, date_to_api, now, parse_date, to_aware_datetime
74logger = logging.getLogger(__name__)
76MAX_PAGINATION_LENGTH = 250
79adminactionlevel2api = {
80 AdminActionLevel.trace: admin_pb2.ADMIN_ACTION_LEVEL_TRACE,
81 AdminActionLevel.debug: admin_pb2.ADMIN_ACTION_LEVEL_DEBUG,
82 AdminActionLevel.normal: admin_pb2.ADMIN_ACTION_LEVEL_NORMAL,
83 AdminActionLevel.high: admin_pb2.ADMIN_ACTION_LEVEL_HIGH,
84}
86api2adminactionlevel = {
87 admin_pb2.ADMIN_ACTION_LEVEL_TRACE: AdminActionLevel.trace,
88 admin_pb2.ADMIN_ACTION_LEVEL_DEBUG: AdminActionLevel.debug,
89 admin_pb2.ADMIN_ACTION_LEVEL_NORMAL: AdminActionLevel.normal,
90 admin_pb2.ADMIN_ACTION_LEVEL_HIGH: AdminActionLevel.high,
91}
93uploadusetype2api = {
94 None: admin_pb2.UPLOAD_USE_TYPE_UNSPECIFIED,
95 UploadUseType.profile_gallery_photo: admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO,
96 UploadUseType.profile_gallery_photo_avatar: admin_pb2.UPLOAD_USE_TYPE_PROFILE_GALLERY_PHOTO_AVATAR,
97 UploadUseType.event: admin_pb2.UPLOAD_USE_TYPE_EVENT,
98 UploadUseType.page: admin_pb2.UPLOAD_USE_TYPE_PAGE,
99}
101otaplatform2api = {
102 None: admin_pb2.OTA_PLATFORM_UNSPECIFIED,
103 OTAPlatform.ios: admin_pb2.OTA_PLATFORM_IOS,
104 OTAPlatform.android: admin_pb2.OTA_PLATFORM_ANDROID,
105}
107api2otaplatform = {
108 admin_pb2.OTA_PLATFORM_UNSPECIFIED: None,
109 admin_pb2.OTA_PLATFORM_IOS: OTAPlatform.ios,
110 admin_pb2.OTA_PLATFORM_ANDROID: OTAPlatform.android,
111}
113nonvisibleuseraccesstype2api = {
114 None: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_UNSPECIFIED,
115 NonvisibleUserAccessType.login_attempt: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_LOGIN_ATTEMPT,
116 NonvisibleUserAccessType.profile_view: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_PROFILE_VIEW,
117 NonvisibleUserAccessType.ghost_served: admin_pb2.NONVISIBLE_USER_ACCESS_TYPE_GHOST_SERVED,
118}
120nonvisibleuserstate2api = {
121 None: admin_pb2.NONVISIBLE_USER_STATE_UNSPECIFIED,
122 NonvisibleUserState.banned: admin_pb2.NONVISIBLE_USER_STATE_BANNED,
123 NonvisibleUserState.shadowed: admin_pb2.NONVISIBLE_USER_STATE_SHADOWED,
124 NonvisibleUserState.deleted: admin_pb2.NONVISIBLE_USER_STATE_DELETED,
125}
128def log_admin_action(
129 session: Session,
130 context: CouchersContext,
131 target_user: User,
132 action_type: str,
133 note: str | None = None,
134 data: object | None = None,
135 tag: str | None = None,
136 level: AdminActionLevel = AdminActionLevel.normal,
137) -> AdminAction:
138 action = AdminAction(
139 admin_user_id=context.user_id,
140 target_user_id=target_user.id,
141 action_type=action_type,
142 level=level,
143 note=note,
144 data=data,
145 tag=tag,
146 )
147 session.add(action)
148 session.flush()
149 return action
152def _live_ota_package_ids(session: Session) -> set[int]:
153 # The live package per (platform, fingerprint) is the newest non-banned one by manifest_created_at,
154 # matching what GetNativeUpdateManifest resolves. DISTINCT ON picks the row with the leading ORDER BY
155 # value per (platform, fingerprint) group in a single index-friendly query.
156 return set(
157 session.scalars(
158 select(OTAPackage.id)
159 .where(OTAPackage.banned_at.is_(None))
160 .distinct(OTAPackage.platform, OTAPackage.fingerprint)
161 .order_by(
162 OTAPackage.platform,
163 OTAPackage.fingerprint,
164 OTAPackage.manifest_created_at.desc(),
165 OTAPackage.id.desc(),
166 )
167 )
168 )
171def _extract_ota_manifest(body: bytes) -> dict[str, Any] | None:
172 # The manifest object is the JSON in the "manifest" part of the signed multipart/mixed body.
173 marker = body.find(b'name="manifest"')
174 if marker == -1:
175 return None
176 body_start = body.find(b"\r\n\r\n", marker)
177 if body_start == -1: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 return None
179 body_end = body.find(b"\r\n--", body_start + 4)
180 if body_end == -1: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 return None
182 try:
183 manifest = json.loads(body[body_start + 4 : body_end])
184 except json.JSONDecodeError:
185 return None
186 return manifest if isinstance(manifest, dict) else None
189def _ota_package_to_pb(package: OTAPackage, live_ids: set[int]) -> admin_pb2.OTAPackage:
190 return admin_pb2.OTAPackage(
191 ota_package_id=package.id,
192 created=Timestamp_from_datetime(package.created),
193 creator_user_id=package.creator_user_id,
194 platform=otaplatform2api[package.platform],
195 fingerprint=package.fingerprint,
196 version=package.version,
197 manifest_created_at=Timestamp_from_datetime(package.manifest_created_at),
198 manifest_id=package.manifest_id,
199 banned=package.banned_at is not None,
200 banned_at=Timestamp_from_datetime(package.banned_at) if package.banned_at else None,
201 banned_by_user_id=package.banned_by_user_id or 0,
202 banned_reason=package.banned_reason or "",
203 live=package.id in live_ids,
204 )
207def _user_to_details(session: Session, user: User) -> admin_pb2.UserDetails:
208 # Query admin actions for this user
209 actions = session.execute(
210 select(AdminAction, User.username)
211 .join(User, AdminAction.admin_user_id == User.id)
212 .where(AdminAction.target_user_id == user.id)
213 .order_by(AdminAction.created.asc())
214 ).all()
216 action_pbs = []
217 for action, admin_username in actions:
218 action_pbs.append(
219 admin_pb2.AdminActionLog(
220 admin_action_id=action.id,
221 created=Timestamp_from_datetime(action.created),
222 admin_user_id=action.admin_user_id,
223 admin_username=admin_username,
224 action_type=action.action_type,
225 level=adminactionlevel2api[action.level],
226 note=action.note or "",
227 data=json.dumps(action.data) if action.data is not None else "",
228 tag=action.tag or "",
229 target_user_id=action.target_user_id,
230 target_username=user.username,
231 )
232 )
234 # Query admin tags
235 admin_tags = (
236 session.execute(
237 select(AdminTag.tag)
238 .join(UserAdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
239 .where(UserAdminTag.user_id == user.id)
240 .order_by(AdminTag.tag)
241 )
242 .scalars()
243 .all()
244 )
246 last_mod_note_acknowledged = session.execute(
247 select(func.max(ModNote.acknowledged)).where(ModNote.user_id == user.id)
248 ).scalar()
250 return admin_pb2.UserDetails(
251 user_id=user.id,
252 username=user.username,
253 name=user.name,
254 email=user.email,
255 gender=user.gender,
256 birthdate=date_to_api(user.birthdate),
257 banned=user.banned_at is not None,
258 deleted=user.deleted_at is not None,
259 shadowed=user.shadowed_at is not None,
260 do_not_email=user.do_not_email,
261 badges=[badge.badge_id for badge in user.badges],
262 **get_strong_verification_fields(session, user),
263 has_passport_sex_gender_exception=user.has_passport_sex_gender_exception,
264 pending_mod_notes_count=user.mod_notes.where(ModNote.is_pending).count(),
265 acknowledged_mod_notes_count=user.mod_notes.where(~ModNote.is_pending).count(),
266 last_mod_note_acknowledged=(
267 Timestamp_from_datetime(last_mod_note_acknowledged) if last_mod_note_acknowledged else None
268 ),
269 admin_actions=action_pbs,
270 admin_tags=list(admin_tags),
271 mod_score=user.mod_score,
272 )
275def _content_report_to_pb(content_report: ContentReport) -> admin_pb2.ContentReport:
276 return admin_pb2.ContentReport(
277 content_report_id=content_report.id,
278 time=Timestamp_from_datetime(content_report.time),
279 reporting_user_id=content_report.reporting_user_id,
280 author_user_id=content_report.author_user_id,
281 reason=content_report.reason,
282 description=content_report.description,
283 content_ref=content_report.content_ref,
284 user_agent=content_report.user_agent,
285 page=content_report.page,
286 )
289def _reference_to_pb(reference: Reference) -> admin_pb2.AdminReference:
290 return admin_pb2.AdminReference(
291 reference_id=reference.id,
292 from_user_id=reference.from_user_id,
293 to_user_id=reference.to_user_id,
294 reference_type=reference.reference_type.name,
295 text=reference.text,
296 private_text=reference.private_text or "",
297 time=Timestamp_from_datetime(reference.time),
298 host_request_id=reference.host_request_id or 0,
299 rating=reference.rating,
300 was_appropriate=reference.was_appropriate,
301 )
304class Admin(admin_pb2_grpc.AdminServicer):
305 def GetUserDetails(
306 self, request: admin_pb2.GetUserDetailsReq, context: CouchersContext, session: Session
307 ) -> admin_pb2.UserDetails:
308 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
309 if not user: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
311 return _user_to_details(session, user)
313 def GetUser(self, request: admin_pb2.GetUserReq, context: CouchersContext, session: Session) -> api_pb2.User:
314 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
315 if not user: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
317 return user_model_to_pb(user, session, context, is_admin_see_ghosts=True)
319 def SearchUsers(
320 self, request: admin_pb2.SearchUsersReq, context: CouchersContext, session: Session
321 ) -> admin_pb2.SearchUsersRes:
322 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
323 next_user_id = int(request.page_token) if request.page_token else 0
324 statement = select(User)
325 if request.username: 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true
326 statement = statement.where(User.username.ilike(request.username))
327 if request.email: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 statement = statement.where(User.email.ilike(request.email))
329 if request.name: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 statement = statement.where(User.name.ilike(request.name))
331 if request.admin_action_log:
332 statement = statement.where(
333 User.id.in_(select(AdminAction.target_user_id).where(AdminAction.note.ilike(request.admin_action_log)))
334 )
335 if request.city: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 statement = statement.where(User.city.ilike(request.city))
337 if request.min_user_id: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 statement = statement.where(User.id >= request.min_user_id)
339 if request.max_user_id: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 statement = statement.where(User.id <= request.max_user_id)
341 if request.min_birthdate: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 statement = statement.where(User.birthdate >= parse_date(request.min_birthdate))
343 if request.max_birthdate: 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true
344 statement = statement.where(User.birthdate <= parse_date(request.max_birthdate))
345 if request.genders: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 statement = statement.where(User.gender.in_(request.genders))
347 if request.min_joined_date: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 statement = statement.where(User.joined >= parse_date(request.min_joined_date))
349 if request.max_joined_date: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 statement = statement.where(User.joined <= parse_date(request.max_joined_date))
351 if request.min_last_active_date: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 statement = statement.where(User.last_active >= parse_date(request.min_last_active_date))
353 if request.max_last_active_date: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 statement = statement.where(User.last_active <= parse_date(request.max_last_active_date))
355 if request.genders: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 statement = statement.where(User.gender.in_(request.genders))
357 if request.language_codes: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 statement = statement.join(
359 LanguageAbility,
360 and_(LanguageAbility.user_id == User.id, LanguageAbility.language_code.in_(request.language_codes)),
361 )
362 if request.HasField("is_deleted"): 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 statement = statement.where((User.deleted_at != None) == request.is_deleted.value)
364 if request.HasField("is_banned"): 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 statement = statement.where((User.banned_at != None) == request.is_banned.value)
366 if request.HasField("is_shadowed"): 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true
367 statement = statement.where((User.shadowed_at != None) == request.is_shadowed.value)
368 if request.HasField("has_avatar"): 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 statement = statement.where(has_avatar_photo_expression(User) == request.has_avatar.value)
370 if request.admin_tags:
371 for tag_name in request.admin_tags:
372 statement = statement.where(
373 User.id.in_(
374 select(UserAdminTag.user_id)
375 .join(AdminTag, UserAdminTag.admin_tag_id == AdminTag.id)
376 .where(AdminTag.tag == tag_name)
377 )
378 )
379 users = (
380 session.execute(
381 statement.where(User.id >= next_user_id)
382 .order_by(User.id)
383 .limit(page_size + 1)
384 .options(selectinload(User.badges))
385 )
386 .scalars()
387 .all()
388 )
389 logger.info(users)
390 return admin_pb2.SearchUsersRes(
391 users=[_user_to_details(session, user) for user in users[:page_size]],
392 next_page_token=str(users[-1].id) if len(users) > page_size else None,
393 )
395 def ChangeUserGender(
396 self, request: admin_pb2.ChangeUserGenderReq, context: CouchersContext, session: Session
397 ) -> admin_pb2.UserDetails:
398 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
399 if not user: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
401 old_gender = user.gender
402 user.gender = request.gender
403 log_admin_action(
404 session, context, user, "change_gender", note=f"Changed from '{old_gender}' to '{request.gender}'"
405 )
406 session.commit()
408 notify(
409 session,
410 user_id=user.id,
411 topic_action=NotificationTopicAction.gender__change,
412 key="",
413 data=notification_data_pb2.GenderChange(
414 gender=request.gender,
415 ),
416 )
418 return _user_to_details(session, user)
420 def ChangeUserBirthdate(
421 self, request: admin_pb2.ChangeUserBirthdateReq, context: CouchersContext, session: Session
422 ) -> admin_pb2.UserDetails:
423 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
424 if not user: 424 ↛ 425line 424 didn't jump to line 425 because the condition on line 424 was never true
425 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
426 if not (birthdate := parse_date(request.birthdate)): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "invalid_birthdate")
429 old_birthdate = user.birthdate
430 user.birthdate = birthdate
431 log_admin_action(
432 session, context, user, "change_birthdate", note=f"Changed from {old_birthdate} to {request.birthdate}"
433 )
434 session.commit()
436 notify(
437 session,
438 user_id=user.id,
439 topic_action=NotificationTopicAction.birthdate__change,
440 key="",
441 data=notification_data_pb2.BirthdateChange(
442 birthdate=request.birthdate,
443 ),
444 )
446 return _user_to_details(session, user)
448 def AddBadge(
449 self, request: admin_pb2.AddBadgeReq, context: CouchersContext, session: Session
450 ) -> admin_pb2.UserDetails:
451 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
452 if not user: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
455 badge = get_badge_dict().get(request.badge_id)
456 if not badge:
457 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
459 if not badge.admin_editable:
460 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:cannot_edit_badge")
462 if badge.id in [b.badge_id for b in user.badges]:
463 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_has_badge")
465 user_add_badge(session, user.id, request.badge_id)
466 log_admin_action(session, context, user, "add_badge", note=f"Added badge {request.badge_id}")
468 return _user_to_details(session, user)
470 def RemoveBadge(
471 self, request: admin_pb2.RemoveBadgeReq, context: CouchersContext, session: Session
472 ) -> admin_pb2.UserDetails:
473 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
474 if not user: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
477 badge = get_badge_dict().get(request.badge_id)
478 if not badge: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "badge_not_found")
481 if not badge.admin_editable: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:cannot_edit_badge")
484 user_badge = session.execute(
485 select(UserBadge).where(UserBadge.user_id == user.id, UserBadge.badge_id == badge.id)
486 ).scalar_one_or_none()
487 if not user_badge:
488 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_does_not_have_badge")
490 user_remove_badge(session, user.id, request.badge_id)
491 log_admin_action(session, context, user, "remove_badge", note=f"Removed badge {request.badge_id}")
493 return _user_to_details(session, user)
495 def SetPassportSexGenderException(
496 self, request: admin_pb2.SetPassportSexGenderExceptionReq, context: CouchersContext, session: Session
497 ) -> admin_pb2.UserDetails:
498 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
499 if not user: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true
500 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
501 old_exception = user.has_passport_sex_gender_exception
502 user.has_passport_sex_gender_exception = request.passport_sex_gender_exception
503 log_admin_action(
504 session,
505 context,
506 user,
507 "set_passport_sex_gender_exception",
508 note=f"Changed from {old_exception} to {request.passport_sex_gender_exception}",
509 )
510 return _user_to_details(session, user)
512 def BanUser(
513 self, request: admin_pb2.BanUserReq, context: CouchersContext, session: Session
514 ) -> admin_pb2.UserDetails:
515 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
516 if not user: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
518 if not request.admin_note.strip(): 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty")
520 log_admin_action(session, context, user, "ban", note=request.admin_note, level=AdminActionLevel.high)
521 user.banned_at = now()
522 return _user_to_details(session, user)
524 def UnbanUser(
525 self, request: admin_pb2.UnbanUserReq, context: CouchersContext, session: Session
526 ) -> admin_pb2.UserDetails:
527 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
528 if not user: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
530 if not request.admin_note.strip(): 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty")
532 log_admin_action(session, context, user, "unban", note=request.admin_note, level=AdminActionLevel.high)
533 user.banned_at = None
534 return _user_to_details(session, user)
536 def ShadowUser(
537 self, request: admin_pb2.ShadowUserReq, context: CouchersContext, session: Session
538 ) -> admin_pb2.UserDetails:
539 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
540 if not user: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
542 if not request.admin_note.strip():
543 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty")
544 log_admin_action(session, context, user, "shadow", note=request.admin_note, level=AdminActionLevel.high)
545 user.shadowed_at = now()
546 # Bulk-shadow all UMS-governed content authored by this user so existing visible content is hidden too
547 bulk_set_user_content_visibility(
548 session=session,
549 user=user,
550 new_visibility=ModerationVisibility.shadowed,
551 moderator_user_id=context.user_id,
552 reason=f"User {user.id} shadowed: {request.admin_note}",
553 )
554 return _user_to_details(session, user)
556 def UnshadowUser(
557 self, request: admin_pb2.UnshadowUserReq, context: CouchersContext, session: Session
558 ) -> admin_pb2.UserDetails:
559 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
560 if not user: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true
561 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
562 if not request.admin_note.strip(): 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true
563 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_cant_be_empty")
564 log_admin_action(session, context, user, "unshadow", note=request.admin_note, level=AdminActionLevel.high)
565 user.shadowed_at = None
566 # Sweep content shadowed by the cascade back to visible; leave hidden/unlisted content where moderators put it
567 bulk_set_user_content_visibility(
568 session=session,
569 user=user,
570 new_visibility=ModerationVisibility.visible,
571 moderator_user_id=context.user_id,
572 from_visibilities={ModerationVisibility.shadowed},
573 reason=f"User {user.id} unshadowed: {request.admin_note}",
574 )
575 return _user_to_details(session, user)
577 def AddAdminNote(
578 self, request: admin_pb2.AddAdminNoteReq, context: CouchersContext, session: Session
579 ) -> admin_pb2.UserDetails:
580 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
581 if not user: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
583 has_note = bool(request.admin_note.strip())
584 has_data = bool(request.data.strip())
585 if has_note == has_data:
586 context.abort_with_error_code(
587 grpc.StatusCode.INVALID_ARGUMENT, "admin:note_requires_exactly_one_of_note_or_data"
588 )
589 data = None
590 if has_data:
591 try:
592 data = json.loads(request.data)
593 except json.JSONDecodeError:
594 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:note_data_must_be_valid_json")
595 level = api2adminactionlevel.get(request.level, AdminActionLevel.normal)
596 log_admin_action(
597 session,
598 context,
599 user,
600 "note",
601 note=request.admin_note if has_note else None,
602 data=data,
603 level=level,
604 )
605 return _user_to_details(session, user)
607 def GetContentReport(
608 self, request: admin_pb2.GetContentReportReq, context: CouchersContext, session: Session
609 ) -> admin_pb2.GetContentReportRes:
610 content_report = session.execute(
611 select(ContentReport).where(ContentReport.id == request.content_report_id)
612 ).scalar_one_or_none()
613 if not content_report:
614 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:content_report_not_found")
615 return admin_pb2.GetContentReportRes(
616 content_report=_content_report_to_pb(content_report),
617 )
619 def GetContentReportsForAuthor(
620 self, request: admin_pb2.GetContentReportsForAuthorReq, context: CouchersContext, session: Session
621 ) -> admin_pb2.GetContentReportsForAuthorRes:
622 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
623 if not user: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
625 content_reports = (
626 session.execute(
627 select(ContentReport).where(ContentReport.author_user_id == user.id).order_by(ContentReport.id.desc())
628 )
629 .scalars()
630 .all()
631 )
632 return admin_pb2.GetContentReportsForAuthorRes(
633 content_reports=[_content_report_to_pb(content_report) for content_report in content_reports],
634 )
636 def SendModNote(
637 self, request: admin_pb2.SendModNoteReq, context: CouchersContext, session: Session
638 ) -> admin_pb2.UserDetails:
639 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
640 if not user: 640 ↛ 641line 640 didn't jump to line 641 because the condition on line 640 was never true
641 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
642 session.add(
643 ModNote(
644 user_id=user.id,
645 internal_id=request.internal_id,
646 creator_user_id=context.user_id,
647 note_content=request.content,
648 )
649 )
650 session.flush()
651 notify_user = "No" if request.do_not_notify else "Yes"
652 log_admin_action(
653 session,
654 context,
655 user,
656 "send_mod_note",
657 note=f"Notify user: {notify_user}\n\n{request.content}",
658 )
660 if not request.do_not_notify:
661 notify(
662 session,
663 user_id=user.id,
664 topic_action=NotificationTopicAction.modnote__create,
665 key="",
666 )
668 return _user_to_details(session, user)
670 def MarkUserNeedsLocationUpdate(
671 self, request: admin_pb2.MarkUserNeedsLocationUpdateReq, context: CouchersContext, session: Session
672 ) -> admin_pb2.UserDetails:
673 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
674 if not user: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true
675 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
676 user.needs_to_update_location = True
677 log_admin_action(
678 session, context, user, "mark_needs_location_update", note="Marked user as needing location update"
679 )
680 return _user_to_details(session, user)
682 def DeleteUser(
683 self, request: admin_pb2.DeleteUserReq, context: CouchersContext, session: Session
684 ) -> admin_pb2.UserDetails:
685 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
686 if not user: 686 ↛ 687line 686 didn't jump to line 687 because the condition on line 686 was never true
687 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
688 user.deleted_at = now()
689 log_admin_action(session, context, user, "delete_user", level=AdminActionLevel.high)
690 return _user_to_details(session, user)
692 def RecoverDeletedUser(
693 self, request: admin_pb2.RecoverDeletedUserReq, context: CouchersContext, session: Session
694 ) -> admin_pb2.UserDetails:
695 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
696 if not user: 696 ↛ 697line 696 didn't jump to line 697 because the condition on line 696 was never true
697 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
698 user.deleted_at = None
699 user.undelete_token = None
700 user.undelete_until = None
701 log_admin_action(session, context, user, "recover_user", level=AdminActionLevel.high)
702 return _user_to_details(session, user)
704 def CreateApiKey(
705 self, request: admin_pb2.CreateApiKeyReq, context: CouchersContext, session: Session
706 ) -> admin_pb2.CreateApiKeyRes:
707 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
708 if not user: 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true
709 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
710 token, expiry = create_session(
711 context, session, user, long_lived=True, is_api_key=True, duration=timedelta(days=365), set_cookie=False
712 )
713 log_admin_action(session, context, user, "create_api_key")
715 notify(
716 session,
717 user_id=user.id,
718 topic_action=NotificationTopicAction.api_key__create,
719 key="",
720 data=notification_data_pb2.ApiKeyCreate(
721 api_key=token,
722 expiry=Timestamp_from_datetime(expiry),
723 ),
724 )
726 return admin_pb2.CreateApiKeyRes()
728 def GetChats(
729 self, request: admin_pb2.GetChatsReq, context: CouchersContext, session: Session
730 ) -> admin_pb2.GetChatsRes:
731 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
732 if not user: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
735 # Cache for ChatUserInfo to avoid recomputing for the same user
736 user_info_cache = {}
738 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo:
739 if user_id not in user_info_cache: 739 ↛ 748line 739 didn't jump to line 748 because the condition on line 739 was always true
740 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
741 user_info_cache[user_id] = admin_pb2.ChatUserInfo(
742 user_id=u.id,
743 username=u.username,
744 name=u.name,
745 birthdate=date_to_api(u.birthdate),
746 gender=u.gender,
747 )
748 return user_info_cache[user_id]
750 def message_to_pb(message: Message) -> admin_pb2.ChatMessage:
751 return admin_pb2.ChatMessage(
752 message_id=message.id,
753 author=get_chat_user_info(message.author_id),
754 time=Timestamp_from_datetime(message.time),
755 message_type=message.message_type.name if message.message_type else "",
756 text=message.text or "",
757 host_request_status_target=(
758 message.host_request_status_target.name if message.host_request_status_target else ""
759 ),
760 target=get_chat_user_info(message.target_id) if message.target_id else None,
761 )
763 def get_messages_for_conversation(conversation_id: int) -> list[admin_pb2.ChatMessage]:
764 messages = (
765 session.execute(
766 select(Message).where(Message.conversation_id == conversation_id).order_by(Message.id.asc())
767 )
768 .scalars()
769 .all()
770 )
771 return [message_to_pb(msg) for msg in messages]
773 def get_host_request_pb(host_request: HostRequest) -> admin_pb2.AdminHostRequest:
774 return admin_pb2.AdminHostRequest(
775 host_request_id=host_request.conversation_id,
776 surfer=get_chat_user_info(host_request.initiator_user_id),
777 host=get_chat_user_info(host_request.recipient_user_id),
778 status=host_request.status.name if host_request.status else "",
779 from_date=date_to_api(host_request.from_date),
780 to_date=date_to_api(host_request.to_date),
781 created=Timestamp_from_datetime(host_request.conversation.created),
782 messages=get_messages_for_conversation(host_request.conversation_id),
783 )
785 def get_group_chat_pb(group_chat: GroupChat) -> admin_pb2.AdminGroupChat:
786 subs = (
787 session.execute(
788 select(GroupChatSubscription)
789 .where(GroupChatSubscription.group_chat_id == group_chat.conversation_id)
790 .order_by(GroupChatSubscription.joined.asc())
791 )
792 .scalars()
793 .all()
794 )
795 members = [
796 admin_pb2.GroupChatMember(
797 user=get_chat_user_info(sub.user_id),
798 joined=Timestamp_from_datetime(sub.joined),
799 left=Timestamp_from_datetime(sub.left) if sub.left else None,
800 role=sub.role.name if sub.role else "",
801 )
802 for sub in subs
803 ]
804 return admin_pb2.AdminGroupChat(
805 group_chat_id=group_chat.conversation_id,
806 title=group_chat.title or "",
807 is_dm=group_chat.is_dm,
808 creator=get_chat_user_info(group_chat.creator_id),
809 members=members,
810 messages=get_messages_for_conversation(group_chat.conversation_id),
811 )
813 # Get all host requests for the user
814 host_requests = (
815 session.execute(
816 select(HostRequest)
817 .where(or_(HostRequest.recipient_user_id == user.id, HostRequest.initiator_user_id == user.id))
818 .order_by(HostRequest.conversation_id.desc())
819 )
820 .scalars()
821 .all()
822 )
824 # Get all group chats for the user
825 group_chat_ids = (
826 session.execute(
827 select(GroupChatSubscription.group_chat_id)
828 .where(GroupChatSubscription.user_id == user.id)
829 .order_by(GroupChatSubscription.joined.desc())
830 )
831 .scalars()
832 .all()
833 )
834 group_chats = (
835 session.execute(select(GroupChat).where(GroupChat.conversation_id.in_(group_chat_ids))).scalars().all()
836 )
838 # Build protobuf objects, then sort by latest message time (most recent first)
839 host_request_pbs = [get_host_request_pb(hr) for hr in host_requests]
840 host_request_pbs.sort(key=lambda hr: hr.messages[-1].time.seconds if hr.messages else 0, reverse=True)
842 group_chat_pbs = [get_group_chat_pb(gc) for gc in group_chats]
843 group_chat_pbs.sort(key=lambda gc: gc.messages[-1].time.seconds if gc.messages else 0, reverse=True)
845 return admin_pb2.GetChatsRes(
846 user=get_chat_user_info(user.id),
847 host_requests=host_request_pbs,
848 group_chats=group_chat_pbs,
849 )
851 def DeleteEvent(
852 self, request: admin_pb2.DeleteEventReq, context: CouchersContext, session: Session
853 ) -> empty_pb2.Empty:
854 res = session.execute(
855 select(Event, EventOccurrence)
856 .where(EventOccurrence.id == request.event_id)
857 .where(EventOccurrence.event_id == Event.id)
858 .where(~EventOccurrence.is_deleted)
859 ).one_or_none()
861 if not res: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true
862 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "event_not_found")
864 event, occurrence = res
866 occurrence.is_deleted = True
868 queue_job(
869 session,
870 job=generate_event_delete_notifications,
871 payload=jobs_pb2.GenerateEventDeleteNotificationsPayload(
872 occurrence_id=occurrence.id,
873 ),
874 )
876 return empty_pb2.Empty()
878 def ListUserIds(
879 self, request: admin_pb2.ListUserIdsReq, context: CouchersContext, session: Session
880 ) -> admin_pb2.ListUserIdsRes:
881 start_date = to_aware_datetime(request.start_time)
882 end_date = to_aware_datetime(request.end_time)
884 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
885 next_user_id = int(request.page_token) if request.page_token else 0
887 user_ids = (
888 session.execute(
889 select(User.id)
890 .where(or_(User.id <= next_user_id, to_bool(next_user_id == 0)))
891 .where(User.joined >= start_date)
892 .where(User.joined <= end_date)
893 .order_by(User.id.desc())
894 .limit(page_size + 1)
895 )
896 .scalars()
897 .all()
898 )
900 return admin_pb2.ListUserIdsRes(
901 user_ids=user_ids[:page_size],
902 next_page_token=str(user_ids[-1]) if len(user_ids) > page_size else None,
903 )
905 def EditReferenceText(
906 self, request: admin_pb2.EditReferenceTextReq, context: CouchersContext, session: Session
907 ) -> empty_pb2.Empty:
908 reference = session.execute(select(Reference).where(Reference.id == request.reference_id)).scalar_one_or_none()
910 if reference is None: 910 ↛ 911line 910 didn't jump to line 911 because the condition on line 910 was never true
911 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:reference_not_found")
913 if not request.new_text.strip(): 913 ↛ 914line 913 didn't jump to line 914 because the condition on line 913 was never true
914 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "reference_no_text")
916 reference.text = request.new_text.strip()
917 # Log action against the reference author
918 author = session.execute(select(User).where(User.id == reference.from_user_id)).scalar_one()
919 log_admin_action(session, context, author, "edit_reference", note=f"Edited reference {reference.id}")
920 return empty_pb2.Empty()
922 def DeleteReference(
923 self, request: admin_pb2.DeleteReferenceReq, context: CouchersContext, session: Session
924 ) -> empty_pb2.Empty:
925 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:deletereference_deprecated_use_ums")
927 def GetUserReferences(
928 self, request: admin_pb2.GetUserReferencesReq, context: CouchersContext, session: Session
929 ) -> admin_pb2.GetUserReferencesRes:
930 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
931 if not user:
932 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
934 references_from = (
935 session.execute(select(Reference).where(Reference.from_user_id == user.id).order_by(Reference.id.desc()))
936 .scalars()
937 .all()
938 )
940 references_to = (
941 session.execute(select(Reference).where(Reference.to_user_id == user.id).order_by(Reference.id.desc()))
942 .scalars()
943 .all()
944 )
946 return admin_pb2.GetUserReferencesRes(
947 references_from=[_reference_to_pb(ref) for ref in references_from],
948 references_to=[_reference_to_pb(ref) for ref in references_to],
949 )
951 def GetFriendRequests(
952 self, request: admin_pb2.GetFriendRequestsReq, context: CouchersContext, session: Session
953 ) -> admin_pb2.GetFriendRequestsRes:
954 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
955 if not user:
956 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
958 user_info_cache: dict[int, admin_pb2.ChatUserInfo] = {}
960 def get_chat_user_info(user_id: int) -> admin_pb2.ChatUserInfo:
961 if user_id not in user_info_cache:
962 u = session.execute(select(User).where(User.id == user_id)).scalar_one()
963 user_info_cache[user_id] = admin_pb2.ChatUserInfo(
964 user_id=u.id,
965 username=u.username,
966 name=u.name,
967 birthdate=date_to_api(u.birthdate),
968 gender=u.gender,
969 )
970 return user_info_cache[user_id]
972 def friend_request_to_pb(rel: FriendRelationship) -> admin_pb2.AdminFriendRequest:
973 return admin_pb2.AdminFriendRequest(
974 friend_request_id=rel.id,
975 from_user=get_chat_user_info(rel.from_user_id),
976 to_user=get_chat_user_info(rel.to_user_id),
977 status=rel.status.name if rel.status else "",
978 time_sent=Timestamp_from_datetime(rel.time_sent),
979 time_responded=Timestamp_from_datetime(rel.time_responded) if rel.time_responded else None,
980 moderation_visibility=rel.moderation_state.visibility.name,
981 )
983 sent = (
984 session.execute(
985 select(FriendRelationship)
986 .where(FriendRelationship.from_user_id == user.id)
987 .order_by(FriendRelationship.id.desc())
988 )
989 .scalars()
990 .all()
991 )
993 received = (
994 session.execute(
995 select(FriendRelationship)
996 .where(FriendRelationship.to_user_id == user.id)
997 .order_by(FriendRelationship.id.desc())
998 )
999 .scalars()
1000 .all()
1001 )
1003 return admin_pb2.GetFriendRequestsRes(
1004 sent=[friend_request_to_pb(rel) for rel in sent],
1005 received=[friend_request_to_pb(rel) for rel in received],
1006 )
1008 def GetNonvisibleUserAccessLog(
1009 self, request: admin_pb2.GetNonvisibleUserAccessLogReq, context: CouchersContext, session: Session
1010 ) -> admin_pb2.GetNonvisibleUserAccessLogRes:
1011 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1012 if not user: 1012 ↛ 1013line 1012 didn't jump to line 1013 because the condition on line 1012 was never true
1013 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1015 actor = aliased(User)
1016 rows = session.execute(
1017 select(NonvisibleUserAccess, actor.username)
1018 .outerjoin(actor, NonvisibleUserAccess.actor_user_id == actor.id)
1019 .where(NonvisibleUserAccess.target_user_id == user.id)
1020 .order_by(NonvisibleUserAccess.time.desc())
1021 .limit(MAX_PAGINATION_LENGTH)
1022 ).all()
1024 return admin_pb2.GetNonvisibleUserAccessLogRes(
1025 entries=[
1026 admin_pb2.NonvisibleUserAccessLogEntry(
1027 time=Timestamp_from_datetime(access.time),
1028 access_type=nonvisibleuseraccesstype2api[access.access_type],
1029 target_state=nonvisibleuserstate2api[access.target_state],
1030 target_user_id=access.target_user_id,
1031 actor_user_id=Int64Value(value=access.actor_user_id) if access.actor_user_id is not None else None,
1032 actor_username=actor_username or "",
1033 ip_address=access.ip_address or "",
1034 user_agent=access.user_agent or "",
1035 sofa=access.sofa or "",
1036 )
1037 for access, actor_username in rows
1038 ]
1039 )
1041 def EditDiscussion(
1042 self, request: admin_pb2.EditDiscussionReq, context: CouchersContext, session: Session
1043 ) -> empty_pb2.Empty:
1044 discussion = session.execute(
1045 select(Discussion).where(Discussion.id == request.discussion_id)
1046 ).scalar_one_or_none()
1047 if not discussion:
1048 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
1049 if request.new_title:
1050 discussion.title = request.new_title.strip()
1051 if request.new_content:
1052 discussion.content = request.new_content.strip()
1053 return empty_pb2.Empty()
1055 def DeleteDiscussion(
1056 self, request: admin_pb2.AdminDeleteDiscussionReq, context: CouchersContext, session: Session
1057 ) -> empty_pb2.Empty:
1058 discussion = session.execute(
1059 select(Discussion).where(Discussion.id == request.discussion_id)
1060 ).scalar_one_or_none()
1061 if not discussion: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true
1062 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "discussion_not_found")
1063 if discussion.deleted is not None: 1063 ↛ 1064line 1063 didn't jump to line 1064 because the condition on line 1063 was never true
1064 return empty_pb2.Empty()
1065 session.add(
1066 DiscussionVersion(
1067 discussion_id=discussion.id,
1068 editor_user_id=context.user_id,
1069 change_type=ContentChangeType.delete,
1070 old_title=discussion.title,
1071 new_title=None,
1072 old_content=discussion.content,
1073 new_content=None,
1074 )
1075 )
1076 discussion.deleted = now()
1077 return empty_pb2.Empty()
1079 def EditReply(self, request: admin_pb2.EditReplyReq, context: CouchersContext, session: Session) -> empty_pb2.Empty:
1080 database_id, depth = unpack_thread_id(request.reply_id)
1081 if depth == 1:
1082 obj: Comment | Reply | None = session.execute(
1083 select(Comment).where(Comment.id == database_id)
1084 ).scalar_one_or_none()
1085 elif depth == 2:
1086 obj = session.execute(select(Reply).where(Reply.id == database_id)).scalar_one_or_none()
1087 else:
1088 obj = None
1090 if not obj:
1091 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:object_not_found")
1092 old_content = obj.content
1093 new_content = request.new_content.strip()
1094 if depth == 1:
1095 session.add(
1096 CommentVersion(
1097 comment_id=database_id,
1098 editor_user_id=context.user_id,
1099 change_type=ContentChangeType.edit,
1100 old_content=old_content,
1101 new_content=new_content,
1102 )
1103 )
1104 else:
1105 session.add(
1106 ReplyVersion(
1107 reply_id=database_id,
1108 editor_user_id=context.user_id,
1109 change_type=ContentChangeType.edit,
1110 old_content=old_content,
1111 new_content=new_content,
1112 )
1113 )
1114 obj.content = new_content
1115 return empty_pb2.Empty()
1117 def AddUsersToModerationUserList(
1118 self, request: admin_pb2.AddUsersToModerationUserListReq, context: CouchersContext, session: Session
1119 ) -> admin_pb2.AddUsersToModerationUserListRes:
1120 """Add multiple users to a moderation user list. If no moderation list is provided, a new one is created.
1121 Id of the moderation list is returned."""
1122 req_users = request.users
1123 users = []
1125 for req_user in req_users:
1126 user = session.execute(select(User).where(username_or_email_or_id(req_user))).scalar_one_or_none()
1127 if not user:
1128 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1129 users.append(user)
1131 if request.moderation_list_id:
1132 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
1133 if not moderation_user_list:
1134 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:moderation_user_list_not_found")
1135 # Create a new moderation user list if no one is provided
1136 else:
1137 moderation_user_list = ModerationUserList()
1138 session.add(moderation_user_list)
1139 session.flush()
1141 # Add users to the moderation list only if not already in it
1142 for user in users:
1143 if user not in moderation_user_list.users: 1143 ↛ 1145line 1143 didn't jump to line 1145 because the condition on line 1143 was always true
1144 moderation_user_list.users.append(user)
1145 log_admin_action(session, context, user, "add_to_moderation_list")
1147 return admin_pb2.AddUsersToModerationUserListRes(moderation_list_id=moderation_user_list.id)
1149 def ListModerationUserLists(
1150 self, request: admin_pb2.ListModerationUserListsReq, context: CouchersContext, session: Session
1151 ) -> admin_pb2.ListModerationUserListsRes:
1152 """Lists all moderation user lists for a user."""
1153 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1154 if not user: 1154 ↛ 1155line 1154 didn't jump to line 1155 because the condition on line 1154 was never true
1155 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1157 moderation_lists = [
1158 admin_pb2.ModerationList(
1159 moderation_list_id=ml.id,
1160 members=[_user_to_details(session, u) for u in ml.users],
1161 )
1162 for ml in user.moderation_user_lists
1163 ]
1164 return admin_pb2.ListModerationUserListsRes(moderation_lists=moderation_lists)
1166 def RemoveUserFromModerationUserList(
1167 self, request: admin_pb2.RemoveUserFromModerationUserListReq, context: CouchersContext, session: Session
1168 ) -> empty_pb2.Empty:
1169 """Removes a user from a provided moderation user list."""
1170 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1171 if not user:
1172 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1173 if not request.moderation_list_id:
1174 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:missing_moderation_user_list_id")
1176 moderation_user_list = session.get(ModerationUserList, request.moderation_list_id)
1177 if not moderation_user_list: 1177 ↛ 1178line 1177 didn't jump to line 1178 because the condition on line 1177 was never true
1178 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:moderation_user_list_not_found")
1179 if user not in moderation_user_list.users:
1180 context.abort_with_error_code(
1181 grpc.StatusCode.FAILED_PRECONDITION, "admin:user_not_in_the_moderation_user_list"
1182 )
1184 moderation_user_list.users.remove(user)
1185 log_admin_action(session, context, user, "remove_from_moderation_list")
1187 if len(moderation_user_list.users) == 0:
1188 session.delete(moderation_user_list)
1190 return empty_pb2.Empty()
1192 def CreateAccountDeletionLink(
1193 self, request: admin_pb2.CreateAccountDeletionLinkReq, context: CouchersContext, session: Session
1194 ) -> admin_pb2.CreateAccountDeletionLinkRes:
1195 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1196 if not user: 1196 ↛ 1197line 1196 didn't jump to line 1197 because the condition on line 1196 was never true
1197 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1198 token = AccountDeletionToken(token=urlsafe_secure_token(), user_id=user.id, expiry=now() + timedelta(hours=2))
1199 session.add(token)
1200 log_admin_action(session, context, user, "create_account_deletion_link", level=AdminActionLevel.high)
1201 return admin_pb2.CreateAccountDeletionLinkRes(
1202 account_deletion_confirm_url=urls.delete_account_link(account_deletion_token=token.token)
1203 )
1205 def AccessStats(
1206 self, request: admin_pb2.AccessStatsReq, context: CouchersContext, session: Session
1207 ) -> admin_pb2.AccessStatsRes:
1208 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1209 if not user: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true
1210 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1212 start_time = (
1213 to_aware_datetime(request.start_time) if request.HasField("start_time") else now() - timedelta(days=90)
1214 )
1215 end_time = to_aware_datetime(request.end_time) if request.HasField("end_time") else now()
1217 user_activity = session.execute(
1218 select(
1219 UserActivity.ip_address,
1220 UserActivity.user_agent,
1221 func.sum(UserActivity.api_calls),
1222 func.count(UserActivity.period),
1223 func.min(UserActivity.period),
1224 func.max(UserActivity.period),
1225 )
1226 .where(UserActivity.user_id == user.id)
1227 .where(UserActivity.period >= start_time)
1228 .where(UserActivity.period <= end_time)
1229 .order_by(func.max(UserActivity.period).desc())
1230 .group_by(UserActivity.ip_address, UserActivity.user_agent)
1231 ).all()
1233 out = admin_pb2.AccessStatsRes()
1235 for ip_address, user_agent, api_call_count, periods_count, first_seen, last_seen in user_activity:
1236 ip_address_str = str(ip_address) if ip_address is not None else None
1237 user_agent_data = user_agents_parse(user_agent or "")
1238 asn = geoip_asn(ip_address_str)
1239 out.stats.append(
1240 admin_pb2.AccessStat(
1241 ip_address=ip_address_str,
1242 asn=str(asn[0]) if asn else None,
1243 asorg=str(asn[1]) if asn else None,
1244 asnetwork=str(asn[2]) if asn else None,
1245 user_agent=user_agent,
1246 operating_system=user_agent_data.os.family,
1247 browser=user_agent_data.browser.family,
1248 device=user_agent_data.device.family,
1249 approximate_location=geoip_approximate_location(ip_address_str) or "Unknown",
1250 api_call_count=api_call_count,
1251 periods_count=periods_count,
1252 first_seen=Timestamp_from_datetime(first_seen),
1253 last_seen=Timestamp_from_datetime(last_seen),
1254 )
1255 )
1257 return out
1259 def SetLastDonated(
1260 self, request: admin_pb2.SetLastDonatedReq, context: CouchersContext, session: Session
1261 ) -> admin_pb2.UserDetails:
1262 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1263 if not user:
1264 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1266 if request.HasField("last_donated"):
1267 user.last_donated = to_aware_datetime(request.last_donated)
1268 else:
1269 user.last_donated = None
1271 log_admin_action(session, context, user, "set_last_donated")
1272 return _user_to_details(session, user)
1274 def CreateAdminTag(
1275 self, request: admin_pb2.CreateAdminTagReq, context: CouchersContext, session: Session
1276 ) -> admin_pb2.AdminTagInfo:
1277 if not request.tag.strip():
1278 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:tag_cant_be_empty")
1279 existing = session.execute(select(AdminTag).where(AdminTag.tag == request.tag.strip())).scalar_one_or_none()
1280 if existing:
1281 context.abort_with_error_code(grpc.StatusCode.ALREADY_EXISTS, "admin:tag_already_exists")
1282 admin_tag = AdminTag(tag=request.tag.strip())
1283 session.add(admin_tag)
1284 session.flush()
1285 return admin_pb2.AdminTagInfo(admin_tag_id=admin_tag.id, tag=admin_tag.tag)
1287 def ListAdminTags(
1288 self, request: admin_pb2.ListAdminTagsReq, context: CouchersContext, session: Session
1289 ) -> admin_pb2.ListAdminTagsRes:
1290 tags = session.execute(select(AdminTag).order_by(AdminTag.tag)).scalars().all()
1291 return admin_pb2.ListAdminTagsRes(
1292 tags=[admin_pb2.AdminTagInfo(admin_tag_id=tag.id, tag=tag.tag) for tag in tags]
1293 )
1295 def AddAdminTagToUser(
1296 self, request: admin_pb2.AddAdminTagToUserReq, context: CouchersContext, session: Session
1297 ) -> admin_pb2.UserDetails:
1298 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1299 if not user: 1299 ↛ 1300line 1299 didn't jump to line 1300 because the condition on line 1299 was never true
1300 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1301 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
1302 if not admin_tag:
1303 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:tag_not_found")
1304 existing = session.execute(
1305 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
1306 ).scalar_one_or_none()
1307 if existing:
1308 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_already_has_admin_tag")
1309 session.add(UserAdminTag(user_id=user.id, admin_tag_id=admin_tag.id))
1310 session.flush()
1311 log_admin_action(session, context, user, "add_tag", tag=request.tag)
1312 return _user_to_details(session, user)
1314 def RemoveAdminTagFromUser(
1315 self, request: admin_pb2.RemoveAdminTagFromUserReq, context: CouchersContext, session: Session
1316 ) -> admin_pb2.UserDetails:
1317 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1318 if not user: 1318 ↛ 1319line 1318 didn't jump to line 1319 because the condition on line 1318 was never true
1319 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1320 admin_tag = session.execute(select(AdminTag).where(AdminTag.tag == request.tag)).scalar_one_or_none()
1321 if not admin_tag: 1321 ↛ 1322line 1321 didn't jump to line 1322 because the condition on line 1321 was never true
1322 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:tag_not_found")
1323 user_admin_tag = session.execute(
1324 select(UserAdminTag).where(UserAdminTag.user_id == user.id, UserAdminTag.admin_tag_id == admin_tag.id)
1325 ).scalar_one_or_none()
1326 if not user_admin_tag:
1327 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:user_does_not_have_admin_tag")
1328 session.delete(user_admin_tag)
1329 session.flush()
1330 log_admin_action(session, context, user, "remove_tag", tag=request.tag)
1331 return _user_to_details(session, user)
1333 def SetModScore(
1334 self, request: admin_pb2.SetModScoreReq, context: CouchersContext, session: Session
1335 ) -> admin_pb2.UserDetails:
1336 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1337 if not user:
1338 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1339 user.mod_score = request.mod_score
1340 log_admin_action(session, context, user, "set_mod_score", note=f"mod_score={request.mod_score}")
1341 return _user_to_details(session, user)
1343 def ListAdminActions(
1344 self, request: admin_pb2.ListAdminActionsReq, context: CouchersContext, session: Session
1345 ) -> admin_pb2.ListAdminActionsRes:
1346 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1348 admin_user = aliased(User)
1349 target_user = aliased(User)
1351 statement = (
1352 select(AdminAction, admin_user.username, target_user.username)
1353 .join(admin_user, AdminAction.admin_user_id == admin_user.id)
1354 .join(target_user, AdminAction.target_user_id == target_user.id)
1355 )
1357 if request.admin_user_id:
1358 statement = statement.where(AdminAction.admin_user_id == request.admin_user_id)
1359 if request.target_user_id:
1360 statement = statement.where(AdminAction.target_user_id == request.target_user_id)
1361 if request.page_token:
1362 statement = statement.where(AdminAction.id < int(request.page_token))
1364 statement = statement.order_by(AdminAction.id.desc()).limit(page_size + 1)
1366 rows = session.execute(statement).all()
1368 action_pbs = [
1369 admin_pb2.AdminActionLog(
1370 admin_action_id=action.id,
1371 created=Timestamp_from_datetime(action.created),
1372 admin_user_id=action.admin_user_id,
1373 admin_username=admin_username,
1374 action_type=action.action_type,
1375 level=adminactionlevel2api[action.level],
1376 note=action.note or "",
1377 data=json.dumps(action.data) if action.data is not None else "",
1378 tag=action.tag or "",
1379 target_user_id=action.target_user_id,
1380 target_username=target_username,
1381 )
1382 for action, admin_username, target_username in rows[:page_size]
1383 ]
1385 return admin_pb2.ListAdminActionsRes(
1386 admin_actions=action_pbs,
1387 next_page_token=str(rows[page_size - 1][0].id) if len(rows) > page_size else None,
1388 )
1390 def ListUserUploads(
1391 self, request: admin_pb2.ListUserUploadsReq, context: CouchersContext, session: Session
1392 ) -> admin_pb2.ListUserUploadsRes:
1393 user = session.execute(select(User).where(username_or_email_or_id(request.user))).scalar_one_or_none()
1394 if not user:
1395 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
1397 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
1399 statement = select(Upload).where(Upload.creator_user_id == user.id)
1400 if request.page_token:
1401 cursor_created = session.execute(
1402 select(Upload.created).where(Upload.key == request.page_token)
1403 ).scalar_one()
1404 statement = statement.where(tuple_(Upload.created, Upload.key) < (cursor_created, request.page_token))
1406 uploads = (
1407 session.execute(statement.order_by(Upload.created.desc(), Upload.key.desc()).limit(page_size + 1))
1408 .scalars()
1409 .all()
1410 )
1412 page = uploads[:page_size]
1413 uses_by_key = get_upload_uses_for_keys(session, [upload.key for upload in page])
1415 return admin_pb2.ListUserUploadsRes(
1416 uploads=[
1417 admin_pb2.UserUpload(
1418 key=upload.key,
1419 filename=upload.filename,
1420 full_url=upload.full_url,
1421 thumbnail_url=upload.thumbnail_url,
1422 credit=upload.credit or "",
1423 created=Timestamp_from_datetime(upload.created),
1424 uses=[
1425 admin_pb2.UploadUse(
1426 type=uploadusetype2api[use.use_type],
1427 is_current=use.is_current,
1428 user_id=use.user_id,
1429 event_id=use.event_id,
1430 page_id=use.page_id,
1431 url=use.url,
1432 )
1433 for use in uses_by_key.get(upload.key, [])
1434 ],
1435 )
1436 for upload in page
1437 ],
1438 next_page_token=uploads[page_size - 1].key if len(uploads) > page_size else None,
1439 )
1441 def CreateOTAPackage(
1442 self, request: admin_pb2.CreateOTAPackageReq, context: CouchersContext, session: Session
1443 ) -> admin_pb2.OTAPackage:
1444 platform = api2otaplatform.get(request.platform)
1445 if platform is None: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true
1446 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_platform")
1448 if not request.version:
1449 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_version")
1451 existing = session.execute(
1452 select(OTAPackage.id).where(OTAPackage.platform == platform).where(OTAPackage.version == request.version)
1453 ).scalar_one_or_none()
1454 if existing is not None:
1455 context.abort_with_error_code(grpc.StatusCode.FAILED_PRECONDITION, "admin:ota_package_already_exists")
1457 # Read the keying/ordering fields out of the manifest we're about to serve, so the row can't
1458 # disagree with the bytes on the CDN.
1459 cdn_root = context.get_string_value("native_ota_cdn_root", "https://cdn.couchers.org/native/ota")
1460 _content_type, body = _fetch_signed_manifest(
1461 _native_ota_manifest_url(cdn_root=cdn_root, version=request.version, platform=platform.name)
1462 )
1463 manifest = _extract_ota_manifest(body)
1464 fingerprint = manifest.get("runtimeVersion") if manifest else None
1465 manifest_id = manifest.get("id") if manifest else None
1466 created_at_raw = manifest.get("createdAt") if manifest else None
1467 if (
1468 manifest is None
1469 or not isinstance(fingerprint, str)
1470 or not fingerprint
1471 or not isinstance(manifest_id, str)
1472 or not manifest_id
1473 or not isinstance(created_at_raw, str)
1474 or not created_at_raw
1475 ):
1476 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_manifest")
1477 try:
1478 manifest_created_at = datetime.fromisoformat(created_at_raw)
1479 except ValueError:
1480 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_manifest")
1481 if manifest_created_at.tzinfo is None: 1481 ↛ 1482line 1481 didn't jump to line 1482 because the condition on line 1481 was never true
1482 manifest_created_at = manifest_created_at.replace(tzinfo=UTC)
1484 package = OTAPackage(
1485 creator_user_id=context.user_id,
1486 platform=platform,
1487 fingerprint=fingerprint,
1488 version=request.version,
1489 manifest_created_at=manifest_created_at,
1490 manifest_id=manifest_id,
1491 )
1492 session.add(package)
1493 session.flush()
1495 return _ota_package_to_pb(package, _live_ota_package_ids(session))
1497 def ListOTAPackages(
1498 self, request: admin_pb2.ListOTAPackagesReq, context: CouchersContext, session: Session
1499 ) -> admin_pb2.ListOTAPackagesRes:
1500 statement = select(OTAPackage).order_by(OTAPackage.manifest_created_at.desc(), OTAPackage.id.desc())
1501 if request.platform != admin_pb2.OTA_PLATFORM_UNSPECIFIED:
1502 platform = api2otaplatform.get(request.platform)
1503 if platform is None: 1503 ↛ 1504line 1503 didn't jump to line 1504 because the condition on line 1503 was never true
1504 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:invalid_ota_platform")
1505 statement = statement.where(OTAPackage.platform == platform)
1506 if request.fingerprint: 1506 ↛ 1507line 1506 didn't jump to line 1507 because the condition on line 1506 was never true
1507 statement = statement.where(OTAPackage.fingerprint == request.fingerprint)
1508 if not request.include_banned:
1509 statement = statement.where(OTAPackage.banned_at.is_(None))
1511 packages = session.execute(statement).scalars().all()
1512 live_ids = _live_ota_package_ids(session)
1513 return admin_pb2.ListOTAPackagesRes(packages=[_ota_package_to_pb(package, live_ids) for package in packages])
1515 def BanOTAPackage(
1516 self, request: admin_pb2.BanOTAPackageReq, context: CouchersContext, session: Session
1517 ) -> admin_pb2.OTAPackage:
1518 # Bans are irreversible — to roll back an accidental ban, republish the bundle as a new
1519 # package — so a reason is required for the audit trail.
1520 if not request.reason.strip():
1521 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:ota_ban_reason_required")
1523 package = session.execute(
1524 select(OTAPackage).where(OTAPackage.id == request.ota_package_id)
1525 ).scalar_one_or_none()
1526 if package is None:
1527 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:ota_package_not_found")
1529 if package.banned_at is None: 1529 ↛ 1533line 1529 didn't jump to line 1533 because the condition on line 1529 was always true
1530 package.banned_at = now()
1531 package.banned_by_user_id = context.user_id
1532 package.banned_reason = request.reason
1533 session.flush()
1535 return _ota_package_to_pb(package, _live_ota_package_ids(session))