Coverage for app / backend / src / couchers / servicers / moderation.py: 83%
164 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1import logging
2from typing import TYPE_CHECKING
4import grpc
5from sqlalchemy import and_, exists, not_, or_, select
6from sqlalchemy.orm import Session
8from couchers.context import CouchersContext
9from couchers.jobs.enqueue import queue_job
10from couchers.metrics import (
11 observe_moderation_action,
12 observe_moderation_queue_item_created,
13 observe_moderation_queue_item_resolved,
14 observe_moderation_queue_resolution_time,
15 observe_moderation_visibility_transition,
16)
17from couchers.models import (
18 Event,
19 EventOccurrence,
20 FriendRelationship,
21 GroupChat,
22 HostRequest,
23 Message,
24 MessageType,
25 ModerationAction,
26 ModerationLog,
27 ModerationObjectType,
28 ModerationQueueItem,
29 ModerationState,
30 ModerationTrigger,
31 ModerationVisibility,
32 Notification,
33 NotificationDelivery,
34)
35from couchers.proto import moderation_pb2, moderation_pb2_grpc
36from couchers.proto.internal import jobs_pb2
37from couchers.utils import Timestamp_from_datetime, now
39if TYPE_CHECKING:
40 from couchers.sql import _ModeratedContent
42logger = logging.getLogger(__name__)
44MAX_PAGINATION_LENGTH = 1_000
46# Moderation enum mappings
47moderationvisibility2api = {
48 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED,
49 ModerationVisibility.hidden: moderation_pb2.MODERATION_VISIBILITY_HIDDEN,
50 ModerationVisibility.shadowed: moderation_pb2.MODERATION_VISIBILITY_SHADOWED,
51 ModerationVisibility.visible: moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
52 ModerationVisibility.unlisted: moderation_pb2.MODERATION_VISIBILITY_UNLISTED,
53}
55moderationvisibility2sql = {
56 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None,
57 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.hidden,
58 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.shadowed,
59 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.visible,
60 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.unlisted,
61}
63moderationtrigger2api = {
64 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED,
65 ModerationTrigger.initial_review: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW,
66 ModerationTrigger.user_flag: moderation_pb2.MODERATION_TRIGGER_USER_FLAG,
67 ModerationTrigger.machine_flag: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG,
68 ModerationTrigger.moderator_review: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW,
69}
71moderationtrigger2sql = {
72 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None,
73 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.initial_review,
74 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.user_flag,
75 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.machine_flag,
76 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.moderator_review,
77}
79moderationaction2api = {
80 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED,
81 ModerationAction.create: moderation_pb2.MODERATION_ACTION_CREATE,
82 ModerationAction.approve: moderation_pb2.MODERATION_ACTION_APPROVE,
83 ModerationAction.hide: moderation_pb2.MODERATION_ACTION_HIDE,
84 ModerationAction.flag: moderation_pb2.MODERATION_ACTION_FLAG,
85 ModerationAction.unflag: moderation_pb2.MODERATION_ACTION_UNFLAG,
86}
88moderationaction2sql = {
89 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None,
90 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.create,
91 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.approve,
92 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.hide,
93 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.flag,
94 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.unflag,
95}
97moderationobjecttype2api = {
98 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED,
99 ModerationObjectType.host_request: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
100 ModerationObjectType.group_chat: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
101 ModerationObjectType.friend_request: moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST,
102 ModerationObjectType.event_occurrence: moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE,
103}
105moderationobjecttype2sql = {
106 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None,
107 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.host_request,
108 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.group_chat,
109 moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST: ModerationObjectType.friend_request,
110 moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE: ModerationObjectType.event_occurrence,
111}
113# Mapping from ModerationObjectType to the SQLAlchemy model class
114moderationobjecttype2model: dict[ModerationObjectType, _ModeratedContent] = {
115 ModerationObjectType.host_request: HostRequest,
116 ModerationObjectType.group_chat: GroupChat,
117 ModerationObjectType.friend_request: FriendRelationship,
118 ModerationObjectType.event_occurrence: EventOccurrence,
119}
122def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo:
123 """Convert ModerationState model to proto message"""
124 object_type = state.object_type
125 object_id = state.object_id
127 # Get the author user ID and content based on object type
128 if object_type == ModerationObjectType.host_request:
129 author_user_id = session.execute(
130 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id)
131 ).scalar_one()
132 # Get the first text message for this conversation
133 content = session.execute(
134 select(Message.text)
135 .where(Message.conversation_id == object_id)
136 .where(Message.message_type == MessageType.text)
137 .order_by(Message.id.asc())
138 .limit(1)
139 ).scalar_one_or_none()
140 elif object_type == ModerationObjectType.group_chat:
141 author_user_id = session.execute(
142 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id)
143 ).scalar_one()
144 # Get the first text message for this conversation
145 content = session.execute(
146 select(Message.text)
147 .where(Message.conversation_id == object_id)
148 .where(Message.message_type == MessageType.text)
149 .order_by(Message.id.asc())
150 .limit(1)
151 ).scalar_one_or_none()
152 elif object_type == ModerationObjectType.friend_request:
153 author_user_id = session.execute(
154 select(FriendRelationship.from_user_id).where(FriendRelationship.id == object_id)
155 ).scalar_one()
156 # Friend requests have no text content
157 content = None
158 elif object_type == ModerationObjectType.event_occurrence: 158 ↛ 166line 158 didn't jump to line 166 because the condition on line 158 was always true
159 author_user_id, title, description = session.execute(
160 select(EventOccurrence.creator_user_id, Event.title, EventOccurrence.content)
161 .join(Event, Event.id == EventOccurrence.event_id)
162 .where(EventOccurrence.id == object_id)
163 ).one()
164 content = f"{title}\n\n{description}"
165 else:
166 raise ValueError(f"Unsupported moderation object type: {object_type}")
168 state_pb = moderation_pb2.ModerationStateInfo(
169 moderation_state_id=state.id,
170 object_type=moderationobjecttype2api[state.object_type],
171 object_id=state.object_id,
172 visibility=moderationvisibility2api[state.visibility],
173 created=Timestamp_from_datetime(state.created),
174 updated=Timestamp_from_datetime(state.updated),
175 author_user_id=author_user_id,
176 content=content or "",
177 )
179 return state_pb
182class Moderation(moderation_pb2_grpc.ModerationServicer):
183 def GetModerationQueue(
184 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session
185 ) -> moderation_pb2.GetModerationQueueRes:
186 """Get moderation queue items with optional filtering"""
188 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
190 # Build query
191 statement = select(ModerationQueueItem)
193 # Apply page token filter based on ordering direction
194 if request.page_token:
195 page_token_id = int(request.page_token)
196 if request.newest_first: 196 ↛ 201line 196 didn't jump to line 201 because the condition on line 196 was always true
197 # Descending order: get items with smaller IDs
198 statement = statement.where(ModerationQueueItem.id < page_token_id)
199 else:
200 # Ascending order: get items with larger IDs
201 statement = statement.where(ModerationQueueItem.id > page_token_id)
203 # Apply filters
204 if request.triggers:
205 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers]
206 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers))
208 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 internal_object_type = moderationobjecttype2sql[request.object_type]
210 if internal_object_type:
211 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type)
213 if request.unresolved_only:
214 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None))
216 if request.HasField("created_before"):
217 created_before = request.created_before.ToDatetime()
218 statement = statement.where(ModerationQueueItem.time_created < created_before)
220 if request.HasField("created_after"):
221 created_after = request.created_after.ToDatetime()
222 statement = statement.where(ModerationQueueItem.time_created > created_after)
224 if request.item_author_user_id:
225 author_user_id = request.item_author_user_id
227 # Use EXISTS for efficient author filtering
228 author_exists_clauses = []
229 for model in moderationobjecttype2model.values():
230 author_col = getattr(model, model.__moderation_author_column__)
231 author_exists_clauses.append(
232 exists().where(
233 and_(
234 model.moderation_state_id == ModerationQueueItem.moderation_state_id,
235 author_col == author_user_id,
236 )
237 )
238 )
239 statement = statement.where(or_(*author_exists_clauses))
241 # Order by time created
242 if request.newest_first:
243 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc())
244 else:
245 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc())
247 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all()
249 # Convert to proto
250 queue_items_pb = []
251 for item in queue_items[:page_size]:
252 # Fetch the moderation state for this queue item
253 mod_state = session.execute(
254 select(ModerationState).where(ModerationState.id == item.moderation_state_id)
255 ).scalar_one()
257 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
258 queue_item_id=item.id,
259 moderation_state_id=item.moderation_state_id,
260 time_created=Timestamp_from_datetime(item.time_created),
261 trigger=moderationtrigger2api[item.trigger],
262 reason=item.reason,
263 is_resolved=item.resolved_by_log_id is not None,
264 resolved_by_log_id=item.resolved_by_log_id or 0,
265 moderation_state=moderation_state_to_pb(mod_state, session),
266 )
268 queue_items_pb.append(queue_item_pb)
270 return moderation_pb2.GetModerationQueueRes(
271 queue_items=queue_items_pb,
272 # Use the ID of the last returned item (not the extra fetched item) as the cursor
273 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None,
274 )
276 def GetModerationState(
277 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session
278 ) -> moderation_pb2.GetModerationStateRes:
279 """Get moderation state by object type and object ID"""
280 object_type = moderationobjecttype2sql[request.object_type]
281 if object_type is None:
282 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.")
284 moderation_state = session.execute(
285 select(ModerationState)
286 .where(ModerationState.object_type == object_type)
287 .where(ModerationState.object_id == request.object_id)
288 ).scalar_one_or_none()
289 if moderation_state is None:
290 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
292 return moderation_pb2.GetModerationStateRes(
293 moderation_state=moderation_state_to_pb(moderation_state, session),
294 )
296 def GetModerationLog(
297 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session
298 ) -> moderation_pb2.GetModerationLogRes:
299 """Get moderation log for a specific moderation state"""
300 # Get the moderation state
301 moderation_state = session.execute(
302 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
303 ).scalar_one_or_none()
304 if moderation_state is None:
305 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
307 # Get all log entries for this state, ordered by time (most recent first)
308 log_entries = (
309 session.execute(
310 select(ModerationLog)
311 .where(ModerationLog.moderation_state_id == request.moderation_state_id)
312 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc())
313 )
314 .scalars()
315 .all()
316 )
318 # Convert moderation state to proto first (while still in session)
319 moderation_state_pb = moderation_state_to_pb(moderation_state, session)
321 # Convert to proto
322 log_entries_pb = []
323 for entry in log_entries:
324 log_entry_pb = moderation_pb2.ModerationLogEntryInfo(
325 log_entry_id=entry.id,
326 moderation_state_id=entry.moderation_state_id,
327 time=Timestamp_from_datetime(entry.time),
328 action=moderationaction2api[entry.action],
329 moderator_user_id=entry.moderator_user_id,
330 reason=entry.reason,
331 )
333 # Only include changed fields
334 if entry.new_visibility is not None: 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was always true
335 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility]
337 log_entries_pb.append(log_entry_pb)
339 return moderation_pb2.GetModerationLogRes(
340 log_entries=log_entries_pb,
341 moderation_state=moderation_state_pb,
342 )
344 def ModerateContent(
345 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session
346 ) -> moderation_pb2.ModerateContentRes:
347 """Unified moderation action - takes both action and visibility explicitly"""
349 moderation_state = session.execute(
350 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
351 ).scalar_one_or_none()
352 if moderation_state is None:
353 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
355 # Convert proto enums to internal enums
356 action = moderationaction2sql[request.action]
357 if action is None: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified")
360 new_visibility = moderationvisibility2sql[request.visibility]
361 if new_visibility is None: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified")
364 reason = request.reason or "Moderated by admin"
366 # Track old visibility for metrics
367 old_visibility = moderation_state.visibility
369 # Update visibility
370 moderation_state.visibility = new_visibility
371 moderation_state.updated = now()
373 # Log the action
374 log_entry = ModerationLog(
375 moderation_state_id=moderation_state.id,
376 action=action,
377 moderator_user_id=context.user_id,
378 new_visibility=new_visibility,
379 reason=reason,
380 )
381 session.add(log_entry)
382 session.flush()
384 # Resolve any pending queue items
385 queue_item = session.execute(
386 select(ModerationQueueItem)
387 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
388 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
389 .order_by(ModerationQueueItem.time_created.desc())
390 ).scalar_one_or_none()
392 if queue_item:
393 queue_item.resolved_by_log_id = log_entry.id
394 session.flush()
395 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type)
396 observe_moderation_queue_resolution_time(
397 queue_item.trigger,
398 action,
399 moderation_state.object_type,
400 (now() - queue_item.time_created).total_seconds(),
401 )
403 observe_moderation_action(action, moderation_state.object_type)
404 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type)
406 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications
407 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted):
408 pending_notifications = (
409 session.execute(
410 select(Notification)
411 .where(Notification.moderation_state_id == moderation_state.id)
412 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id)))
413 )
414 .scalars()
415 .all()
416 )
418 # Import here to avoid circular dependency
419 from couchers.notifications.background import handle_notification
421 for notification in pending_notifications:
422 queue_job(
423 session,
424 job=handle_notification,
425 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id),
426 )
428 return moderation_pb2.ModerateContentRes(
429 moderation_state=moderation_state_to_pb(moderation_state, session),
430 )
432 def FlagContentForReview(
433 self, request: moderation_pb2.FlagContentForReviewReq, context: CouchersContext, session: Session
434 ) -> moderation_pb2.FlagContentForReviewRes:
435 """Flag content for review by adding it to the moderation queue"""
437 moderation_state = session.execute(
438 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
439 ).scalar_one_or_none()
440 if not moderation_state: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
443 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.initial_review
444 reason = request.reason or "Flagged by admin for review"
446 # Add to moderation queue
447 queue_item = ModerationQueueItem(
448 moderation_state_id=request.moderation_state_id,
449 trigger=trigger,
450 reason=reason,
451 )
452 session.add(queue_item)
453 session.flush()
455 observe_moderation_action(ModerationAction.flag, moderation_state.object_type)
456 observe_moderation_queue_item_created(trigger, moderation_state.object_type)
458 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
459 queue_item_id=queue_item.id,
460 moderation_state_id=queue_item.moderation_state_id,
461 time_created=Timestamp_from_datetime(queue_item.time_created),
462 trigger=moderationtrigger2api[queue_item.trigger],
463 reason=queue_item.reason,
464 is_resolved=False,
465 resolved_by_log_id=0,
466 moderation_state=moderation_state_to_pb(moderation_state, session),
467 )
469 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb)
471 def UnflagContent(
472 self, request: moderation_pb2.UnflagContentReq, context: CouchersContext, session: Session
473 ) -> moderation_pb2.UnflagContentRes:
474 """Unflag content by resolving pending queue items"""
476 moderation_state = session.execute(
477 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
478 ).scalar_one_or_none()
479 if not moderation_state:
480 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
482 reason = request.reason or "Unflagged by admin"
484 # Update moderation state (inline moderate_content logic)
485 moderation_state.updated = now()
487 # Log the unflag action
488 log_entry = ModerationLog(
489 moderation_state_id=moderation_state.id,
490 action=ModerationAction.unflag,
491 moderator_user_id=context.user_id,
492 new_visibility=None,
493 reason=reason,
494 )
495 session.add(log_entry)
496 session.flush()
498 # Resolve any pending queue items (inline resolve_queue_item logic)
499 queue_item = session.execute(
500 select(ModerationQueueItem)
501 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
502 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
503 .order_by(ModerationQueueItem.time_created.desc())
504 ).scalar_one_or_none()
506 if queue_item:
507 queue_item.resolved_by_log_id = log_entry.id
508 session.flush()
509 observe_moderation_queue_item_resolved(
510 queue_item.trigger, ModerationAction.unflag, moderation_state.object_type
511 )
512 observe_moderation_queue_resolution_time(
513 queue_item.trigger,
514 ModerationAction.unflag,
515 moderation_state.object_type,
516 (now() - queue_item.time_created).total_seconds(),
517 )
519 observe_moderation_action(ModerationAction.unflag, moderation_state.object_type)
521 return moderation_pb2.UnflagContentRes(
522 moderation_state=moderation_state_to_pb(moderation_state, session),
523 )