Coverage for app / backend / src / couchers / servicers / moderation.py: 82%
153 statements
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
« prev ^ index » next coverage.py v7.13.2, created at 2026-02-03 06:18 +0000
1import logging
3import grpc
4from sqlalchemy import and_, exists, not_, or_, select
5from sqlalchemy.orm import Session
7from couchers.context import CouchersContext
8from couchers.jobs.enqueue import queue_job
9from couchers.metrics import (
10 observe_moderation_action,
11 observe_moderation_queue_item_created,
12 observe_moderation_queue_item_resolved,
13 observe_moderation_queue_resolution_time,
14 observe_moderation_visibility_transition,
15)
16from couchers.models import (
17 GroupChat,
18 HostRequest,
19 Message,
20 MessageType,
21 ModerationAction,
22 ModerationLog,
23 ModerationObjectType,
24 ModerationQueueItem,
25 ModerationState,
26 ModerationTrigger,
27 ModerationVisibility,
28 Notification,
29 NotificationDelivery,
30)
31from couchers.proto import moderation_pb2, moderation_pb2_grpc
32from couchers.proto.internal import jobs_pb2
33from couchers.utils import Timestamp_from_datetime, now
35logger = logging.getLogger(__name__)
37MAX_PAGINATION_LENGTH = 1_000
39# Moderation enum mappings
40moderationvisibility2api = {
41 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED,
42 ModerationVisibility.HIDDEN: moderation_pb2.MODERATION_VISIBILITY_HIDDEN,
43 ModerationVisibility.SHADOWED: moderation_pb2.MODERATION_VISIBILITY_SHADOWED,
44 ModerationVisibility.VISIBLE: moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
45 ModerationVisibility.UNLISTED: moderation_pb2.MODERATION_VISIBILITY_UNLISTED,
46}
48moderationvisibility2sql = {
49 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None,
50 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.HIDDEN,
51 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.SHADOWED,
52 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.VISIBLE,
53 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.UNLISTED,
54}
56moderationtrigger2api = {
57 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED,
58 ModerationTrigger.INITIAL_REVIEW: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW,
59 ModerationTrigger.USER_FLAG: moderation_pb2.MODERATION_TRIGGER_USER_FLAG,
60 ModerationTrigger.MACHINE_FLAG: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG,
61 ModerationTrigger.MODERATOR_REVIEW: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW,
62}
64moderationtrigger2sql = {
65 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None,
66 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.INITIAL_REVIEW,
67 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.USER_FLAG,
68 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.MACHINE_FLAG,
69 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.MODERATOR_REVIEW,
70}
72moderationaction2api = {
73 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED,
74 ModerationAction.CREATE: moderation_pb2.MODERATION_ACTION_CREATE,
75 ModerationAction.APPROVE: moderation_pb2.MODERATION_ACTION_APPROVE,
76 ModerationAction.HIDE: moderation_pb2.MODERATION_ACTION_HIDE,
77 ModerationAction.FLAG: moderation_pb2.MODERATION_ACTION_FLAG,
78 ModerationAction.UNFLAG: moderation_pb2.MODERATION_ACTION_UNFLAG,
79}
81moderationaction2sql = {
82 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None,
83 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.CREATE,
84 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.APPROVE,
85 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.HIDE,
86 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.FLAG,
87 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.UNFLAG,
88}
90moderationobjecttype2api = {
91 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED,
92 ModerationObjectType.HOST_REQUEST: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
93 ModerationObjectType.GROUP_CHAT: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
94}
96moderationobjecttype2sql = {
97 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None,
98 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.HOST_REQUEST,
99 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.GROUP_CHAT,
100}
103def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo:
104 """Convert ModerationState model to proto message"""
105 object_type = state.object_type
106 object_id = state.object_id
108 # Get the author user ID
109 if object_type == ModerationObjectType.HOST_REQUEST:
110 author_user_id = session.execute(
111 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id)
112 ).scalar_one()
113 elif object_type == ModerationObjectType.GROUP_CHAT: 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true
114 author_user_id = session.execute(
115 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id)
116 ).scalar_one()
117 else:
118 raise ValueError(f"Unsupported moderation object type: {object_type}")
120 # Get the first text message for this conversation
121 content = session.execute(
122 select(Message.text)
123 .where(Message.conversation_id == object_id)
124 .where(Message.message_type == MessageType.text)
125 .order_by(Message.id.asc())
126 .limit(1)
127 ).scalar_one_or_none()
129 state_pb = moderation_pb2.ModerationStateInfo(
130 moderation_state_id=state.id,
131 object_type=moderationobjecttype2api[state.object_type],
132 object_id=state.object_id,
133 visibility=moderationvisibility2api[state.visibility],
134 created=Timestamp_from_datetime(state.created),
135 updated=Timestamp_from_datetime(state.updated),
136 author_user_id=author_user_id,
137 content=content or "",
138 )
140 return state_pb
143class Moderation(moderation_pb2_grpc.ModerationServicer):
144 def GetModerationQueue(
145 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session
146 ) -> moderation_pb2.GetModerationQueueRes:
147 """Get moderation queue items with optional filtering"""
149 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
151 # Build query
152 statement = select(ModerationQueueItem)
154 # Apply page token filter based on ordering direction
155 if request.page_token:
156 page_token_id = int(request.page_token)
157 if request.newest_first: 157 ↛ 162line 157 didn't jump to line 162 because the condition on line 157 was always true
158 # Descending order: get items with smaller IDs
159 statement = statement.where(ModerationQueueItem.id < page_token_id)
160 else:
161 # Ascending order: get items with larger IDs
162 statement = statement.where(ModerationQueueItem.id > page_token_id)
164 # Apply filters
165 if request.triggers:
166 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers]
167 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers))
169 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 internal_object_type = moderationobjecttype2sql[request.object_type]
171 if internal_object_type:
172 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type)
174 if request.unresolved_only:
175 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None))
177 if request.HasField("created_before"):
178 created_before = request.created_before.ToDatetime()
179 statement = statement.where(ModerationQueueItem.time_created < created_before)
181 if request.HasField("created_after"):
182 created_after = request.created_after.ToDatetime()
183 statement = statement.where(ModerationQueueItem.time_created > created_after)
185 if request.item_author_user_id:
186 author_user_id = request.item_author_user_id
188 # Use EXISTS for efficient author filtering
189 hr_exists = exists().where(
190 and_(
191 HostRequest.moderation_state_id == ModerationQueueItem.moderation_state_id,
192 HostRequest.surfer_user_id == author_user_id,
193 )
194 )
195 gc_exists = exists().where(
196 and_(
197 GroupChat.moderation_state_id == ModerationQueueItem.moderation_state_id,
198 GroupChat.creator_id == author_user_id,
199 )
200 )
201 statement = statement.where(or_(hr_exists, gc_exists))
203 # Order by time created
204 if request.newest_first:
205 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc())
206 else:
207 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc())
209 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all()
211 # Convert to proto
212 queue_items_pb = []
213 for item in queue_items[:page_size]:
214 # Fetch the moderation state for this queue item
215 mod_state = session.execute(
216 select(ModerationState).where(ModerationState.id == item.moderation_state_id)
217 ).scalar_one()
219 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
220 queue_item_id=item.id,
221 moderation_state_id=item.moderation_state_id,
222 time_created=Timestamp_from_datetime(item.time_created),
223 trigger=moderationtrigger2api[item.trigger],
224 reason=item.reason,
225 is_resolved=item.resolved_by_log_id is not None,
226 resolved_by_log_id=item.resolved_by_log_id or 0,
227 moderation_state=moderation_state_to_pb(mod_state, session),
228 )
230 queue_items_pb.append(queue_item_pb)
232 return moderation_pb2.GetModerationQueueRes(
233 queue_items=queue_items_pb,
234 # Use the ID of the last returned item (not the extra fetched item) as the cursor
235 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None,
236 )
238 def GetModerationState(
239 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session
240 ) -> moderation_pb2.GetModerationStateRes:
241 """Get moderation state by object type and object ID"""
242 object_type = moderationobjecttype2sql[request.object_type]
243 if object_type is None:
244 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.")
246 moderation_state = session.execute(
247 select(ModerationState)
248 .where(ModerationState.object_type == object_type)
249 .where(ModerationState.object_id == request.object_id)
250 ).scalar_one_or_none()
251 if moderation_state is None:
252 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
254 return moderation_pb2.GetModerationStateRes(
255 moderation_state=moderation_state_to_pb(moderation_state, session),
256 )
258 def GetModerationLog(
259 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session
260 ) -> moderation_pb2.GetModerationLogRes:
261 """Get moderation log for a specific moderation state"""
262 # Get the moderation state
263 moderation_state = session.execute(
264 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
265 ).scalar_one_or_none()
266 if moderation_state is None:
267 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
269 # Get all log entries for this state, ordered by time (most recent first)
270 log_entries = (
271 session.execute(
272 select(ModerationLog)
273 .where(ModerationLog.moderation_state_id == request.moderation_state_id)
274 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc())
275 )
276 .scalars()
277 .all()
278 )
280 # Convert moderation state to proto first (while still in session)
281 moderation_state_pb = moderation_state_to_pb(moderation_state, session)
283 # Convert to proto
284 log_entries_pb = []
285 for entry in log_entries:
286 log_entry_pb = moderation_pb2.ModerationLogEntryInfo(
287 log_entry_id=entry.id,
288 moderation_state_id=entry.moderation_state_id,
289 time=Timestamp_from_datetime(entry.time),
290 action=moderationaction2api[entry.action],
291 moderator_user_id=entry.moderator_user_id,
292 reason=entry.reason,
293 )
295 # Only include changed fields
296 if entry.new_visibility is not None: 296 ↛ 299line 296 didn't jump to line 299 because the condition on line 296 was always true
297 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility]
299 log_entries_pb.append(log_entry_pb)
301 return moderation_pb2.GetModerationLogRes(
302 log_entries=log_entries_pb,
303 moderation_state=moderation_state_pb,
304 )
306 def ModerateContent(
307 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session
308 ) -> moderation_pb2.ModerateContentRes:
309 """Unified moderation action - takes both action and visibility explicitly"""
311 moderation_state = session.execute(
312 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
313 ).scalar_one_or_none()
314 if moderation_state is None:
315 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
317 # Convert proto enums to internal enums
318 action = moderationaction2sql[request.action]
319 if action is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified")
322 new_visibility = moderationvisibility2sql[request.visibility]
323 if new_visibility is None: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified")
326 reason = request.reason or "Moderated by admin"
328 # Track old visibility for metrics
329 old_visibility = moderation_state.visibility
331 # Update visibility
332 moderation_state.visibility = new_visibility
333 moderation_state.updated = now()
335 # Log the action
336 log_entry = ModerationLog(
337 moderation_state_id=moderation_state.id,
338 action=action,
339 moderator_user_id=context.user_id,
340 new_visibility=new_visibility,
341 reason=reason,
342 )
343 session.add(log_entry)
344 session.flush()
346 # Resolve any pending queue items
347 queue_item = session.execute(
348 select(ModerationQueueItem)
349 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
350 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
351 .order_by(ModerationQueueItem.time_created.desc())
352 ).scalar_one_or_none()
354 if queue_item:
355 queue_item.resolved_by_log_id = log_entry.id
356 session.flush()
357 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type)
358 observe_moderation_queue_resolution_time(
359 queue_item.trigger,
360 action,
361 moderation_state.object_type,
362 (now() - queue_item.time_created).total_seconds(),
363 )
365 observe_moderation_action(action, moderation_state.object_type)
366 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type)
368 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications
369 if new_visibility in (ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED):
370 pending_notifications = (
371 session.execute(
372 select(Notification)
373 .where(Notification.moderation_state_id == moderation_state.id)
374 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id)))
375 )
376 .scalars()
377 .all()
378 )
380 # Import here to avoid circular dependency
381 from couchers.notifications.background import handle_notification
383 for notification in pending_notifications:
384 queue_job(
385 session,
386 job=handle_notification,
387 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id),
388 )
390 return moderation_pb2.ModerateContentRes(
391 moderation_state=moderation_state_to_pb(moderation_state, session),
392 )
394 def FlagContentForReview(
395 self, request: moderation_pb2.FlagContentForReviewReq, context: CouchersContext, session: Session
396 ) -> moderation_pb2.FlagContentForReviewRes:
397 """Flag content for review by adding it to the moderation queue"""
399 moderation_state = session.execute(
400 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
401 ).scalar_one_or_none()
402 if not moderation_state: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
405 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.INITIAL_REVIEW
406 reason = request.reason or "Flagged by admin for review"
408 # Add to moderation queue
409 queue_item = ModerationQueueItem(
410 moderation_state_id=request.moderation_state_id,
411 trigger=trigger,
412 reason=reason,
413 )
414 session.add(queue_item)
415 session.flush()
417 observe_moderation_action(ModerationAction.FLAG, moderation_state.object_type)
418 observe_moderation_queue_item_created(trigger, moderation_state.object_type)
420 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
421 queue_item_id=queue_item.id,
422 moderation_state_id=queue_item.moderation_state_id,
423 time_created=Timestamp_from_datetime(queue_item.time_created),
424 trigger=moderationtrigger2api[queue_item.trigger],
425 reason=queue_item.reason,
426 is_resolved=False,
427 resolved_by_log_id=0,
428 moderation_state=moderation_state_to_pb(moderation_state, session),
429 )
431 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb)
433 def UnflagContent(
434 self, request: moderation_pb2.UnflagContentReq, context: CouchersContext, session: Session
435 ) -> moderation_pb2.UnflagContentRes:
436 """Unflag content by resolving pending queue items"""
438 moderation_state = session.execute(
439 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
440 ).scalar_one_or_none()
441 if not moderation_state:
442 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
444 reason = request.reason or "Unflagged by admin"
446 # Update moderation state (inline moderate_content logic)
447 moderation_state.updated = now()
449 # Log the unflag action
450 log_entry = ModerationLog(
451 moderation_state_id=moderation_state.id,
452 action=ModerationAction.UNFLAG,
453 moderator_user_id=context.user_id,
454 new_visibility=None,
455 reason=reason,
456 )
457 session.add(log_entry)
458 session.flush()
460 # Resolve any pending queue items (inline resolve_queue_item logic)
461 queue_item = session.execute(
462 select(ModerationQueueItem)
463 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
464 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
465 .order_by(ModerationQueueItem.time_created.desc())
466 ).scalar_one_or_none()
468 if queue_item:
469 queue_item.resolved_by_log_id = log_entry.id
470 session.flush()
471 observe_moderation_queue_item_resolved(
472 queue_item.trigger, ModerationAction.UNFLAG, moderation_state.object_type
473 )
474 observe_moderation_queue_resolution_time(
475 queue_item.trigger,
476 ModerationAction.UNFLAG,
477 moderation_state.object_type,
478 (now() - queue_item.time_created).total_seconds(),
479 )
481 observe_moderation_action(ModerationAction.UNFLAG, moderation_state.object_type)
483 return moderation_pb2.UnflagContentRes(
484 moderation_state=moderation_state_to_pb(moderation_state, session),
485 )