Coverage for app/backend/src/couchers/servicers/moderation.py: 93%
263 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1import logging
3import grpc
4from sqlalchemy import and_, exists, not_, or_, select
5from sqlalchemy.orm import Session
7from couchers.context import CouchersContext
8from couchers.jobs.enqueue import queue_job
9from couchers.metrics import (
10 observe_moderation_action,
11 observe_moderation_queue_item_created,
12 observe_moderation_queue_item_resolved,
13 observe_moderation_queue_resolution_time,
14 observe_moderation_visibility_transition,
15)
16from couchers.models import (
17 AdminActionLevel,
18 Comment,
19 Discussion,
20 Event,
21 EventOccurrence,
22 FriendRelationship,
23 GroupChat,
24 HostRequest,
25 Message,
26 MessageType,
27 ModerationAction,
28 ModerationLog,
29 ModerationObjectType,
30 ModerationQueueItem,
31 ModerationState,
32 ModerationTrigger,
33 ModerationVisibility,
34 Notification,
35 NotificationDelivery,
36 Reference,
37 Reply,
38 User,
39 get_moderated_models,
40)
41from couchers.proto import moderation_pb2, moderation_pb2_grpc
42from couchers.proto.internal import jobs_pb2
43from couchers.utils import Timestamp_from_datetime, now
45logger = logging.getLogger(__name__)
47MAX_PAGINATION_LENGTH = 1_000
49# Moderation enum mappings
50moderationvisibility2api = {
51 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED,
52 ModerationVisibility.hidden: moderation_pb2.MODERATION_VISIBILITY_HIDDEN,
53 ModerationVisibility.shadowed: moderation_pb2.MODERATION_VISIBILITY_SHADOWED,
54 ModerationVisibility.visible: moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
55 ModerationVisibility.unlisted: moderation_pb2.MODERATION_VISIBILITY_UNLISTED,
56}
58moderationvisibility2sql = {
59 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None,
60 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.hidden,
61 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.shadowed,
62 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.visible,
63 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.unlisted,
64}
66moderationtrigger2api = {
67 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED,
68 ModerationTrigger.initial_review: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW,
69 ModerationTrigger.user_flag: moderation_pb2.MODERATION_TRIGGER_USER_FLAG,
70 ModerationTrigger.machine_flag: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG,
71 ModerationTrigger.moderator_review: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW,
72}
74moderationtrigger2sql = {
75 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None,
76 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.initial_review,
77 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.user_flag,
78 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.machine_flag,
79 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.moderator_review,
80}
82moderationaction2api = {
83 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED,
84 ModerationAction.create: moderation_pb2.MODERATION_ACTION_CREATE,
85 ModerationAction.approve: moderation_pb2.MODERATION_ACTION_APPROVE,
86 ModerationAction.hide: moderation_pb2.MODERATION_ACTION_HIDE,
87 ModerationAction.flag: moderation_pb2.MODERATION_ACTION_FLAG,
88 ModerationAction.unflag: moderation_pb2.MODERATION_ACTION_UNFLAG,
89 ModerationAction.set_priority: moderation_pb2.MODERATION_ACTION_SET_PRIORITY,
90 ModerationAction.bulk_set_visibility: moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY,
91}
93moderationaction2sql = {
94 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None,
95 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.create,
96 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.approve,
97 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.hide,
98 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.flag,
99 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.unflag,
100 moderation_pb2.MODERATION_ACTION_SET_PRIORITY: ModerationAction.set_priority,
101 moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY: ModerationAction.bulk_set_visibility,
102}
104moderationobjecttype2api = {
105 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED,
106 ModerationObjectType.host_request: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
107 ModerationObjectType.group_chat: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
108 ModerationObjectType.friend_request: moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST,
109 ModerationObjectType.event_occurrence: moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE,
110 ModerationObjectType.comment: moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT,
111 ModerationObjectType.reply: moderation_pb2.MODERATION_OBJECT_TYPE_REPLY,
112 ModerationObjectType.discussion: moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION,
113 ModerationObjectType.reference: moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE,
114}
116moderationobjecttype2sql = {
117 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None,
118 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.host_request,
119 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.group_chat,
120 moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST: ModerationObjectType.friend_request,
121 moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE: ModerationObjectType.event_occurrence,
122 moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT: ModerationObjectType.comment,
123 moderation_pb2.MODERATION_OBJECT_TYPE_REPLY: ModerationObjectType.reply,
124 moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION: ModerationObjectType.discussion,
125 moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE: ModerationObjectType.reference,
126}
129def _resolve_queue_item(
130 queue_item: ModerationQueueItem,
131 log_entry: ModerationLog,
132 action: ModerationAction,
133 object_type: ModerationObjectType,
134) -> None:
135 """Resolve an open queue item against the given log entry and record metrics."""
136 queue_item.resolved_by_log_id = log_entry.id
137 observe_moderation_queue_item_resolved(queue_item.trigger, action, object_type)
138 observe_moderation_queue_resolution_time(
139 queue_item.trigger,
140 action,
141 object_type,
142 (now() - queue_item.time_created).total_seconds(),
143 )
146def bulk_set_user_content_visibility(
147 session: Session,
148 user: User,
149 new_visibility: ModerationVisibility,
150 moderator_user_id: int,
151 from_visibilities: set[ModerationVisibility] | None = None,
152 reason: str | None = None,
153) -> int:
154 """Set visibility on every UMS-governed object authored by the user. Returns count of updated states."""
155 final_reason = reason or f"Bulk visibility update for user {user.id} to {new_visibility.name}"
157 author_exists_clauses = []
158 for entry in get_moderated_models().values():
159 author_exists_clauses.append(
160 exists().where(and_(entry.moderation_state_id_column == ModerationState.id, entry.author_column == user.id))
161 )
163 states = session.execute(select(ModerationState).where(or_(*author_exists_clauses))).scalars().all()
165 updated_count = 0
166 for moderation_state in states:
167 if from_visibilities and moderation_state.visibility not in from_visibilities:
168 continue
169 if moderation_state.visibility == new_visibility:
170 continue
172 old_visibility = moderation_state.visibility
173 moderation_state.visibility = new_visibility
174 moderation_state.updated = now()
176 log_entry = ModerationLog(
177 moderation_state_id=moderation_state.id,
178 action=ModerationAction.bulk_set_visibility,
179 moderator_user_id=moderator_user_id,
180 new_visibility=new_visibility,
181 reason=final_reason,
182 )
183 session.add(log_entry)
184 session.flush()
186 open_items = (
187 session.execute(
188 select(ModerationQueueItem)
189 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
190 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
191 )
192 .scalars()
193 .all()
194 )
195 for queue_item in open_items:
196 _resolve_queue_item(
197 queue_item, log_entry, ModerationAction.bulk_set_visibility, moderation_state.object_type
198 )
199 session.flush()
201 observe_moderation_action(ModerationAction.bulk_set_visibility, moderation_state.object_type)
202 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type)
204 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted):
205 _enqueue_pending_notifications(session, moderation_state.id)
207 updated_count += 1
209 return updated_count
212def _enqueue_pending_notifications(session: Session, moderation_state_id: int) -> None:
213 """Re-queue any pending notifications linked to the given moderation state whose deliveries were suppressed."""
214 pending_notifications = (
215 session.execute(
216 select(Notification)
217 .where(Notification.moderation_state_id == moderation_state_id)
218 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id)))
219 )
220 .scalars()
221 .all()
222 )
224 # Import here to avoid circular dependency
225 from couchers.notifications.background import handle_notification # noqa: PLC0415
227 for notification in pending_notifications:
228 queue_job(
229 session,
230 job=handle_notification,
231 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id),
232 )
235def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo:
236 """Convert ModerationState model to proto message"""
237 object_type = state.object_type
238 object_id = state.object_id
240 # Get the author user ID and content based on object type
241 if object_type == ModerationObjectType.host_request:
242 author_user_id = session.execute(
243 select(HostRequest.initiator_user_id).where(HostRequest.conversation_id == object_id)
244 ).scalar_one()
245 # Get the first text message for this conversation
246 content = session.execute(
247 select(Message.text)
248 .where(Message.conversation_id == object_id)
249 .where(Message.message_type == MessageType.text)
250 .order_by(Message.id.asc())
251 .limit(1)
252 ).scalar_one_or_none()
253 elif object_type == ModerationObjectType.group_chat:
254 author_user_id = session.execute(
255 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id)
256 ).scalar_one()
257 # Get the first text message for this conversation
258 content = session.execute(
259 select(Message.text)
260 .where(Message.conversation_id == object_id)
261 .where(Message.message_type == MessageType.text)
262 .order_by(Message.id.asc())
263 .limit(1)
264 ).scalar_one_or_none()
265 elif object_type == ModerationObjectType.friend_request:
266 author_user_id = session.execute(
267 select(FriendRelationship.from_user_id).where(FriendRelationship.id == object_id)
268 ).scalar_one()
269 # Friend requests have no text content
270 content = None
271 elif object_type == ModerationObjectType.event_occurrence:
272 author_user_id, title, description = session.execute(
273 select(EventOccurrence.creator_user_id, Event.title, EventOccurrence.content)
274 .join(Event, Event.id == EventOccurrence.event_id)
275 .where(EventOccurrence.id == object_id)
276 ).one()
277 content = f"{title}\n\n{description}"
278 elif object_type == ModerationObjectType.comment:
279 author_user_id, content = session.execute(
280 select(Comment.author_user_id, Comment.content).where(Comment.id == object_id)
281 ).one()
282 elif object_type == ModerationObjectType.reply:
283 author_user_id, content = session.execute(
284 select(Reply.author_user_id, Reply.content).where(Reply.id == object_id)
285 ).one()
286 elif object_type == ModerationObjectType.discussion:
287 author_user_id, title, body = session.execute(
288 select(Discussion.creator_user_id, Discussion.title, Discussion.content).where(Discussion.id == object_id)
289 ).one()
290 content = f"{title}\n\n{body}"
291 elif object_type == ModerationObjectType.reference: 291 ↛ 296line 291 didn't jump to line 296 because the condition on line 291 was always true
292 author_user_id, content = session.execute(
293 select(Reference.from_user_id, Reference.text).where(Reference.id == object_id)
294 ).one()
295 else:
296 raise ValueError(f"Unsupported moderation object type: {object_type}")
298 # Import here to avoid circular dependency
299 from couchers.servicers.admin import _user_to_details # noqa: PLC0415
301 author = session.execute(select(User).where(User.id == author_user_id)).scalar_one()
303 state_pb = moderation_pb2.ModerationStateInfo(
304 moderation_state_id=state.id,
305 object_type=moderationobjecttype2api[state.object_type],
306 object_id=state.object_id,
307 visibility=moderationvisibility2api[state.visibility],
308 created=Timestamp_from_datetime(state.created),
309 updated=Timestamp_from_datetime(state.updated),
310 author_user_id=author_user_id,
311 author=_user_to_details(session, author),
312 content=content or "",
313 )
315 return state_pb
318class Moderation(moderation_pb2_grpc.ModerationServicer):
319 def GetModerationQueue(
320 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session
321 ) -> moderation_pb2.GetModerationQueueRes:
322 """Get moderation queue items with optional filtering"""
324 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
326 # Build query
327 statement = select(ModerationQueueItem)
329 # Apply page token filter based on ordering direction
330 if request.page_token:
331 page_token_id = int(request.page_token)
332 if request.newest_first: 332 ↛ 337line 332 didn't jump to line 337 because the condition on line 332 was always true
333 # Descending order: get items with smaller IDs
334 statement = statement.where(ModerationQueueItem.id < page_token_id)
335 else:
336 # Ascending order: get items with larger IDs
337 statement = statement.where(ModerationQueueItem.id > page_token_id)
339 # Apply filters
340 if request.triggers:
341 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers]
342 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers))
344 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 internal_object_type = moderationobjecttype2sql[request.object_type]
346 if internal_object_type:
347 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type)
349 if request.unresolved_only:
350 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None))
352 if request.HasField("priority_min"):
353 statement = statement.where(ModerationQueueItem.priority >= request.priority_min)
355 if request.HasField("priority_max"):
356 statement = statement.where(ModerationQueueItem.priority <= request.priority_max)
358 if request.HasField("created_before"):
359 created_before = request.created_before.ToDatetime()
360 statement = statement.where(ModerationQueueItem.time_created < created_before)
362 if request.HasField("created_after"):
363 created_after = request.created_after.ToDatetime()
364 statement = statement.where(ModerationQueueItem.time_created > created_after)
366 if request.item_author_user_id:
367 author_user_id = request.item_author_user_id
369 # Use EXISTS for efficient author filtering
370 author_exists_clauses = []
371 for entry in get_moderated_models().values():
372 author_exists_clauses.append(
373 exists().where(
374 and_(
375 entry.moderation_state_id_column == ModerationQueueItem.moderation_state_id,
376 entry.author_column == author_user_id,
377 )
378 )
379 )
380 statement = statement.where(or_(*author_exists_clauses))
382 # Order by time created
383 if request.newest_first:
384 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc())
385 else:
386 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc())
388 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all()
390 # Convert to proto
391 queue_items_pb = []
392 for item in queue_items[:page_size]:
393 # Fetch the moderation state for this queue item
394 mod_state = session.execute(
395 select(ModerationState).where(ModerationState.id == item.moderation_state_id)
396 ).scalar_one()
398 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
399 queue_item_id=item.id,
400 moderation_state_id=item.moderation_state_id,
401 time_created=Timestamp_from_datetime(item.time_created),
402 trigger=moderationtrigger2api[item.trigger],
403 reason=item.reason,
404 is_resolved=item.resolved_by_log_id is not None,
405 resolved_by_log_id=item.resolved_by_log_id or 0,
406 moderation_state=moderation_state_to_pb(mod_state, session),
407 priority=item.priority,
408 )
410 queue_items_pb.append(queue_item_pb)
412 return moderation_pb2.GetModerationQueueRes(
413 queue_items=queue_items_pb,
414 # Use the ID of the last returned item (not the extra fetched item) as the cursor
415 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None,
416 )
418 def GetModerationState(
419 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session
420 ) -> moderation_pb2.GetModerationStateRes:
421 """Get moderation state by object type and object ID"""
422 object_type = moderationobjecttype2sql[request.object_type]
423 if object_type is None:
424 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.")
426 moderation_state = session.execute(
427 select(ModerationState)
428 .where(ModerationState.object_type == object_type)
429 .where(ModerationState.object_id == request.object_id)
430 ).scalar_one_or_none()
431 if moderation_state is None:
432 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
434 return moderation_pb2.GetModerationStateRes(
435 moderation_state=moderation_state_to_pb(moderation_state, session),
436 )
438 def GetModerationLog(
439 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session
440 ) -> moderation_pb2.GetModerationLogRes:
441 """Get moderation log for a specific moderation state"""
442 # Get the moderation state
443 moderation_state = session.execute(
444 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
445 ).scalar_one_or_none()
446 if moderation_state is None:
447 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
449 # Get all log entries for this state, ordered by time (most recent first)
450 log_entries = (
451 session.execute(
452 select(ModerationLog)
453 .where(ModerationLog.moderation_state_id == request.moderation_state_id)
454 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc())
455 )
456 .scalars()
457 .all()
458 )
460 # Convert moderation state to proto first (while still in session)
461 moderation_state_pb = moderation_state_to_pb(moderation_state, session)
463 # Convert to proto
464 log_entries_pb = []
465 for entry in log_entries:
466 log_entry_pb = moderation_pb2.ModerationLogEntryInfo(
467 log_entry_id=entry.id,
468 moderation_state_id=entry.moderation_state_id,
469 time=Timestamp_from_datetime(entry.time),
470 action=moderationaction2api[entry.action],
471 moderator_user_id=entry.moderator_user_id,
472 reason=entry.reason,
473 )
475 # Only include changed fields
476 if entry.new_visibility is not None:
477 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility]
478 if entry.new_priority is not None: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true
479 log_entry_pb.new_priority = entry.new_priority
480 if entry.queue_item_id is not None:
481 log_entry_pb.queue_item_id = entry.queue_item_id
483 log_entries_pb.append(log_entry_pb)
485 return moderation_pb2.GetModerationLogRes(
486 log_entries=log_entries_pb,
487 moderation_state=moderation_state_pb,
488 )
490 def ModerateContent(
491 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session
492 ) -> moderation_pb2.ModerateContentRes:
493 """Single moderation entrypoint, dispatching on action.
495 APPROVE/HIDE act on the state's visibility; FLAG/SET_PRIORITY/UNFLAG act on a single
496 queue item. Every action appends a ModerationLog row.
497 """
499 moderation_state = session.execute(
500 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
501 ).scalar_one_or_none()
502 if moderation_state is None:
503 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
505 action = moderationaction2sql[request.action]
506 if action is None: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:action_must_be_specified")
509 reason = request.reason or "Moderated by admin"
510 object_type = moderation_state.object_type
512 if action in (ModerationAction.approve, ModerationAction.hide):
513 new_visibility = moderationvisibility2sql[request.visibility]
514 if new_visibility is None: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified")
517 old_visibility = moderation_state.visibility
518 moderation_state.visibility = new_visibility
519 moderation_state.updated = now()
521 log_entry = ModerationLog(
522 moderation_state_id=moderation_state.id,
523 action=action,
524 moderator_user_id=context.user_id,
525 new_visibility=new_visibility,
526 reason=reason,
527 )
528 session.add(log_entry)
529 session.flush()
531 if request.clear_flags:
532 open_items = (
533 session.execute(
534 select(ModerationQueueItem)
535 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
536 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
537 )
538 .scalars()
539 .all()
540 )
541 for queue_item in open_items:
542 _resolve_queue_item(queue_item, log_entry, action, object_type)
543 session.flush()
545 observe_moderation_action(action, object_type)
546 observe_moderation_visibility_transition(old_visibility, new_visibility, object_type)
548 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted):
549 _enqueue_pending_notifications(session, moderation_state.id)
551 elif action == ModerationAction.flag:
552 trigger = moderationtrigger2sql[request.trigger]
553 if trigger is None:
554 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:trigger_must_be_specified")
556 queue_item = ModerationQueueItem(
557 moderation_state_id=moderation_state.id,
558 trigger=trigger,
559 reason=reason,
560 priority=request.priority,
561 )
562 session.add(queue_item)
563 session.flush()
565 log_entry = ModerationLog(
566 moderation_state_id=moderation_state.id,
567 action=ModerationAction.flag,
568 moderator_user_id=context.user_id,
569 queue_item_id=queue_item.id,
570 reason=reason,
571 )
572 session.add(log_entry)
573 session.flush()
575 if request.supersede_queue_item_id:
576 superseded = session.execute(
577 select(ModerationQueueItem)
578 .where(ModerationQueueItem.id == request.supersede_queue_item_id)
579 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
580 ).scalar_one_or_none()
581 if superseded is None: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true
582 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:queue_item_not_found")
583 if superseded.resolved_by_log_id is None: 583 ↛ 587line 583 didn't jump to line 587 because the condition on line 583 was always true
584 _resolve_queue_item(superseded, log_entry, ModerationAction.flag, object_type)
585 session.flush()
587 observe_moderation_action(ModerationAction.flag, object_type)
588 observe_moderation_queue_item_created(trigger, object_type)
590 elif action == ModerationAction.set_priority:
591 queue_item = self._get_queue_item_for_state(request, context, session, moderation_state.id)
592 queue_item.priority = request.priority
594 log_entry = ModerationLog(
595 moderation_state_id=moderation_state.id,
596 action=ModerationAction.set_priority,
597 moderator_user_id=context.user_id,
598 queue_item_id=queue_item.id,
599 new_priority=request.priority,
600 reason=reason,
601 )
602 session.add(log_entry)
603 session.flush()
605 observe_moderation_action(ModerationAction.set_priority, object_type)
607 elif action == ModerationAction.unflag: 607 ↛ 628line 607 didn't jump to line 628 because the condition on line 607 was always true
608 queue_item = self._get_queue_item_for_state(request, context, session, moderation_state.id)
609 moderation_state.updated = now()
611 log_entry = ModerationLog(
612 moderation_state_id=moderation_state.id,
613 action=ModerationAction.unflag,
614 moderator_user_id=context.user_id,
615 queue_item_id=queue_item.id,
616 reason=reason,
617 )
618 session.add(log_entry)
619 session.flush()
621 if queue_item.resolved_by_log_id is None: 621 ↛ 625line 621 didn't jump to line 625 because the condition on line 621 was always true
622 _resolve_queue_item(queue_item, log_entry, ModerationAction.unflag, object_type)
623 session.flush()
625 observe_moderation_action(ModerationAction.unflag, object_type)
627 else:
628 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:unsupported_action")
630 return moderation_pb2.ModerateContentRes(
631 moderation_state=moderation_state_to_pb(moderation_state, session),
632 )
634 def _get_queue_item_for_state(
635 self,
636 request: moderation_pb2.ModerateContentReq,
637 context: CouchersContext,
638 session: Session,
639 moderation_state_id: int,
640 ) -> ModerationQueueItem:
641 """Resolve the request's target queue item, asserting it belongs to the given state."""
642 if not request.queue_item_id:
643 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:queue_item_id_must_be_specified")
644 queue_item = session.execute(
645 select(ModerationQueueItem)
646 .where(ModerationQueueItem.id == request.queue_item_id)
647 .where(ModerationQueueItem.moderation_state_id == moderation_state_id)
648 ).scalar_one_or_none()
649 if queue_item is None: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true
650 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:queue_item_not_found")
651 return queue_item
653 def SetUserContentVisibility(
654 self, request: moderation_pb2.SetUserContentVisibilityReq, context: CouchersContext, session: Session
655 ) -> moderation_pb2.SetUserContentVisibilityRes:
656 """Bulk-set visibility on every UMS-governed object authored by the given user.
658 If from_visibility is non-empty, only states currently at one of those visibilities are swept.
659 """
660 new_visibility = moderationvisibility2sql[request.visibility]
661 if new_visibility is None:
662 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified")
664 raw_from_visibilities = {moderationvisibility2sql.get(v) for v in request.from_visibility}
665 if None in raw_from_visibilities:
666 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified")
667 from_visibilities: set[ModerationVisibility] | None = {
668 v for v in raw_from_visibilities if v is not None
669 } or None
671 user = session.execute(select(User).where(User.id == request.user_id)).scalar_one_or_none()
672 if not user:
673 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found")
675 updated_count = bulk_set_user_content_visibility(
676 session=session,
677 user=user,
678 new_visibility=new_visibility,
679 moderator_user_id=context.user_id,
680 from_visibilities=from_visibilities,
681 reason=request.reason or None,
682 )
684 # Import here to avoid circular dependency
685 from couchers.servicers.admin import log_admin_action # noqa: PLC0415
687 log_admin_action(
688 session,
689 context,
690 user,
691 "set_user_content_visibility",
692 note=request.reason or None,
693 tag=new_visibility.name,
694 level=AdminActionLevel.high,
695 )
697 return moderation_pb2.SetUserContentVisibilityRes(updated_count=updated_count)
699 def ListModerationStates(
700 self, request: moderation_pb2.ListModerationStatesReq, context: CouchersContext, session: Session
701 ) -> moderation_pb2.ListModerationStatesRes:
702 """Chronological, paginated list of ModerationState rows. Optional author_user_id filter."""
703 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
705 statement = select(ModerationState)
707 if request.page_token:
708 page_token_id = int(request.page_token)
709 if request.newest_first: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 statement = statement.where(ModerationState.id < page_token_id)
711 else:
712 statement = statement.where(ModerationState.id > page_token_id)
714 if request.author_user_id:
715 author_exists_clauses = []
716 for entry in get_moderated_models().values():
717 author_exists_clauses.append(
718 exists().where(
719 and_(
720 entry.moderation_state_id_column == ModerationState.id,
721 entry.author_column == request.author_user_id,
722 )
723 )
724 )
725 statement = statement.where(or_(*author_exists_clauses))
727 if request.newest_first:
728 statement = statement.order_by(ModerationState.created.desc(), ModerationState.id.desc())
729 else:
730 statement = statement.order_by(ModerationState.created.asc(), ModerationState.id.asc())
732 states = session.execute(statement.limit(page_size + 1)).scalars().all()
734 state_pbs = [moderation_state_to_pb(state, session) for state in states[:page_size]]
736 return moderation_pb2.ListModerationStatesRes(
737 moderation_states=state_pbs,
738 next_page_token=str(states[page_size - 1].id) if len(states) > page_size else None,
739 )