Coverage for src/couchers/servicers/moderation.py: 84%
151 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import logging
3import grpc
4from sqlalchemy import and_, exists, not_, or_
6from couchers.jobs.enqueue import queue_job
7from couchers.metrics import (
8 observe_moderation_action,
9 observe_moderation_queue_item_created,
10 observe_moderation_queue_item_resolved,
11 observe_moderation_queue_resolution_time,
12 observe_moderation_visibility_transition,
13)
14from couchers.models import (
15 GroupChat,
16 HostRequest,
17 Message,
18 MessageType,
19 ModerationAction,
20 ModerationLog,
21 ModerationObjectType,
22 ModerationQueueItem,
23 ModerationState,
24 ModerationTrigger,
25 ModerationVisibility,
26 Notification,
27 NotificationDelivery,
28)
29from couchers.proto import moderation_pb2, moderation_pb2_grpc
30from couchers.proto.internal import jobs_pb2
31from couchers.sql import couchers_select as select
32from couchers.utils import Timestamp_from_datetime, now
34logger = logging.getLogger(__name__)
36MAX_PAGINATION_LENGTH = 1_000
38# Moderation enum mappings
39moderationvisibility2api = {
40 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED,
41 ModerationVisibility.HIDDEN: moderation_pb2.MODERATION_VISIBILITY_HIDDEN,
42 ModerationVisibility.SHADOWED: moderation_pb2.MODERATION_VISIBILITY_SHADOWED,
43 ModerationVisibility.VISIBLE: moderation_pb2.MODERATION_VISIBILITY_VISIBLE,
44 ModerationVisibility.UNLISTED: moderation_pb2.MODERATION_VISIBILITY_UNLISTED,
45}
47moderationvisibility2sql = {
48 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None,
49 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.HIDDEN,
50 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.SHADOWED,
51 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.VISIBLE,
52 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.UNLISTED,
53}
55moderationtrigger2api = {
56 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED,
57 ModerationTrigger.INITIAL_REVIEW: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW,
58 ModerationTrigger.USER_FLAG: moderation_pb2.MODERATION_TRIGGER_USER_FLAG,
59 ModerationTrigger.MACHINE_FLAG: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG,
60 ModerationTrigger.MODERATOR_REVIEW: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW,
61}
63moderationtrigger2sql = {
64 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None,
65 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.INITIAL_REVIEW,
66 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.USER_FLAG,
67 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.MACHINE_FLAG,
68 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.MODERATOR_REVIEW,
69}
71moderationaction2api = {
72 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED,
73 ModerationAction.CREATE: moderation_pb2.MODERATION_ACTION_CREATE,
74 ModerationAction.APPROVE: moderation_pb2.MODERATION_ACTION_APPROVE,
75 ModerationAction.HIDE: moderation_pb2.MODERATION_ACTION_HIDE,
76 ModerationAction.FLAG: moderation_pb2.MODERATION_ACTION_FLAG,
77 ModerationAction.UNFLAG: moderation_pb2.MODERATION_ACTION_UNFLAG,
78}
80moderationaction2sql = {
81 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None,
82 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.CREATE,
83 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.APPROVE,
84 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.HIDE,
85 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.FLAG,
86 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.UNFLAG,
87}
89moderationobjecttype2api = {
90 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED,
91 ModerationObjectType.HOST_REQUEST: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST,
92 ModerationObjectType.GROUP_CHAT: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT,
93}
95moderationobjecttype2sql = {
96 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None,
97 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.HOST_REQUEST,
98 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.GROUP_CHAT,
99}
102def moderation_state_to_pb(state: ModerationState, session):
103 """Convert ModerationState model to proto message"""
104 object_type = state.object_type
105 object_id = state.object_id
107 # Get the author user ID
108 if object_type == ModerationObjectType.HOST_REQUEST:
109 author_user_id = session.execute(
110 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id)
111 ).scalar_one()
112 elif object_type == ModerationObjectType.GROUP_CHAT:
113 author_user_id = session.execute(
114 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id)
115 ).scalar_one()
116 else:
117 raise ValueError(f"Unsupported moderation object type: {object_type}")
119 # Get the first text message for this conversation
120 content = session.execute(
121 select(Message.text)
122 .where(Message.conversation_id == object_id)
123 .where(Message.message_type == MessageType.text)
124 .order_by(Message.id.asc())
125 .limit(1)
126 ).scalar_one_or_none()
128 state_pb = moderation_pb2.ModerationStateInfo(
129 moderation_state_id=state.id,
130 object_type=moderationobjecttype2api[state.object_type],
131 object_id=state.object_id,
132 visibility=moderationvisibility2api[state.visibility],
133 created=Timestamp_from_datetime(state.created),
134 updated=Timestamp_from_datetime(state.updated),
135 author_user_id=author_user_id,
136 content=content or "",
137 )
139 return state_pb
142class Moderation(moderation_pb2_grpc.ModerationServicer):
143 def GetModerationQueue(self, request, context, session):
144 """Get moderation queue items with optional filtering"""
146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH)
148 # Build query
149 statement = select(ModerationQueueItem)
151 # Apply page token filter based on ordering direction
152 if request.page_token:
153 page_token_id = int(request.page_token)
154 if request.newest_first:
155 # Descending order: get items with smaller IDs
156 statement = statement.where(ModerationQueueItem.id < page_token_id)
157 else:
158 # Ascending order: get items with larger IDs
159 statement = statement.where(ModerationQueueItem.id > page_token_id)
161 # Apply filters
162 if request.triggers:
163 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers]
164 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers))
166 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED:
167 internal_object_type = moderationobjecttype2sql[request.object_type]
168 if internal_object_type:
169 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type)
171 if request.unresolved_only:
172 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None))
174 if request.HasField("created_before"):
175 created_before = request.created_before.ToDatetime()
176 statement = statement.where(ModerationQueueItem.time_created < created_before)
178 if request.HasField("created_after"):
179 created_after = request.created_after.ToDatetime()
180 statement = statement.where(ModerationQueueItem.time_created > created_after)
182 if request.item_author_user_id:
183 author_user_id = request.item_author_user_id
185 # Use EXISTS for efficient author filtering
186 hr_exists = exists().where(
187 and_(
188 HostRequest.moderation_state_id == ModerationQueueItem.moderation_state_id,
189 HostRequest.surfer_user_id == author_user_id,
190 )
191 )
192 gc_exists = exists().where(
193 and_(
194 GroupChat.moderation_state_id == ModerationQueueItem.moderation_state_id,
195 GroupChat.creator_id == author_user_id,
196 )
197 )
198 statement = statement.where(or_(hr_exists, gc_exists))
200 # Order by time created
201 if request.newest_first:
202 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc())
203 else:
204 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc())
206 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all()
208 # Convert to proto
209 queue_items_pb = []
210 for item in queue_items[:page_size]:
211 # Fetch the moderation state for this queue item
212 mod_state = session.execute(
213 select(ModerationState).where(ModerationState.id == item.moderation_state_id)
214 ).scalar_one()
216 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
217 queue_item_id=item.id,
218 moderation_state_id=item.moderation_state_id,
219 time_created=Timestamp_from_datetime(item.time_created),
220 trigger=moderationtrigger2api[item.trigger],
221 reason=item.reason,
222 is_resolved=item.resolved_by_log_id is not None,
223 resolved_by_log_id=item.resolved_by_log_id or 0,
224 moderation_state=moderation_state_to_pb(mod_state, session),
225 )
227 queue_items_pb.append(queue_item_pb)
229 return moderation_pb2.GetModerationQueueRes(
230 queue_items=queue_items_pb,
231 # Use the ID of the last returned item (not the extra fetched item) as the cursor
232 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None,
233 )
235 def GetModerationState(self, request, context, session):
236 """Get moderation state by object type and object ID"""
237 object_type = moderationobjecttype2sql[request.object_type]
238 if object_type is None:
239 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.")
241 moderation_state = session.execute(
242 select(ModerationState)
243 .where(ModerationState.object_type == object_type)
244 .where(ModerationState.object_id == request.object_id)
245 ).scalar_one_or_none()
246 if moderation_state is None:
247 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
249 return moderation_pb2.GetModerationStateRes(
250 moderation_state=moderation_state_to_pb(moderation_state, session),
251 )
253 def GetModerationLog(self, request, context, session):
254 """Get moderation log for a specific moderation state"""
255 # Get the moderation state
256 moderation_state = session.execute(
257 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
258 ).scalar_one_or_none()
259 if moderation_state is None:
260 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
262 # Get all log entries for this state, ordered by time (most recent first)
263 log_entries = (
264 session.execute(
265 select(ModerationLog)
266 .where(ModerationLog.moderation_state_id == request.moderation_state_id)
267 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc())
268 )
269 .scalars()
270 .all()
271 )
273 # Convert moderation state to proto first (while still in session)
274 moderation_state_pb = moderation_state_to_pb(moderation_state, session)
276 # Convert to proto
277 log_entries_pb = []
278 for entry in log_entries:
279 log_entry_pb = moderation_pb2.ModerationLogEntryInfo(
280 log_entry_id=entry.id,
281 moderation_state_id=entry.moderation_state_id,
282 time=Timestamp_from_datetime(entry.time),
283 action=moderationaction2api[entry.action],
284 moderator_user_id=entry.moderator_user_id,
285 reason=entry.reason,
286 )
288 # Only include changed fields
289 if entry.new_visibility is not None:
290 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility]
292 log_entries_pb.append(log_entry_pb)
294 return moderation_pb2.GetModerationLogRes(
295 log_entries=log_entries_pb,
296 moderation_state=moderation_state_pb,
297 )
299 def ModerateContent(self, request, context, session):
300 """Unified moderation action - takes both action and visibility explicitly"""
302 moderation_state = session.execute(
303 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
304 ).scalar_one_or_none()
305 if moderation_state is None:
306 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.")
308 # Convert proto enums to internal enums
309 action = moderationaction2sql[request.action]
310 if action is None:
311 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified")
313 new_visibility = moderationvisibility2sql[request.visibility]
314 if new_visibility is None:
315 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified")
317 reason = request.reason or "Moderated by admin"
319 # Track old visibility for metrics
320 old_visibility = moderation_state.visibility
322 # Update visibility
323 moderation_state.visibility = new_visibility
324 moderation_state.updated = now()
326 # Log the action
327 log_entry = ModerationLog(
328 moderation_state_id=moderation_state.id,
329 action=action,
330 moderator_user_id=context.user_id,
331 new_visibility=new_visibility,
332 reason=reason,
333 )
334 session.add(log_entry)
335 session.flush()
337 # Resolve any pending queue items
338 queue_item = session.execute(
339 select(ModerationQueueItem)
340 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
341 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
342 .order_by(ModerationQueueItem.time_created.desc())
343 ).scalar_one_or_none()
345 if queue_item:
346 queue_item.resolved_by_log_id = log_entry.id
347 session.flush()
348 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type)
349 observe_moderation_queue_resolution_time(
350 queue_item.trigger,
351 action,
352 moderation_state.object_type,
353 (now() - queue_item.time_created).total_seconds(),
354 )
356 observe_moderation_action(action, moderation_state.object_type)
357 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type)
359 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications
360 if new_visibility in (ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED):
361 pending_notifications = (
362 session.execute(
363 select(Notification)
364 .where(Notification.moderation_state_id == moderation_state.id)
365 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id)))
366 )
367 .scalars()
368 .all()
369 )
371 for notification in pending_notifications:
372 queue_job(
373 session,
374 job_type="handle_notification",
375 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id),
376 )
378 return moderation_pb2.ModerateContentRes(
379 moderation_state=moderation_state_to_pb(moderation_state, session),
380 )
382 def FlagContentForReview(self, request, context, session):
383 """Flag content for review by adding it to the moderation queue"""
385 moderation_state = session.execute(
386 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
387 ).scalar_one_or_none()
388 if not moderation_state:
389 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
391 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.INITIAL_REVIEW
392 reason = request.reason or "Flagged by admin for review"
394 # Add to moderation queue
395 queue_item = ModerationQueueItem(
396 moderation_state_id=request.moderation_state_id,
397 trigger=trigger,
398 reason=reason,
399 )
400 session.add(queue_item)
401 session.flush()
403 observe_moderation_action(ModerationAction.FLAG, moderation_state.object_type)
404 observe_moderation_queue_item_created(trigger, moderation_state.object_type)
406 queue_item_pb = moderation_pb2.ModerationQueueItemInfo(
407 queue_item_id=queue_item.id,
408 moderation_state_id=queue_item.moderation_state_id,
409 time_created=Timestamp_from_datetime(queue_item.time_created),
410 trigger=moderationtrigger2api[queue_item.trigger],
411 reason=queue_item.reason,
412 is_resolved=False,
413 resolved_by_log_id=0,
414 moderation_state=moderation_state_to_pb(moderation_state, session),
415 )
417 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb)
419 def UnflagContent(self, request, context, session):
420 """Unflag content by resolving pending queue items"""
422 moderation_state = session.execute(
423 select(ModerationState).where(ModerationState.id == request.moderation_state_id)
424 ).scalar_one_or_none()
425 if not moderation_state:
426 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found")
428 reason = request.reason or "Unflagged by admin"
430 # Update moderation state (inline moderate_content logic)
431 moderation_state.updated = now()
433 # Log the unflag action
434 log_entry = ModerationLog(
435 moderation_state_id=moderation_state.id,
436 action=ModerationAction.UNFLAG,
437 moderator_user_id=context.user_id,
438 new_visibility=None,
439 reason=reason,
440 )
441 session.add(log_entry)
442 session.flush()
444 # Resolve any pending queue items (inline resolve_queue_item logic)
445 queue_item = session.execute(
446 select(ModerationQueueItem)
447 .where(ModerationQueueItem.moderation_state_id == moderation_state.id)
448 .where(ModerationQueueItem.resolved_by_log_id.is_(None))
449 .order_by(ModerationQueueItem.time_created.desc())
450 ).scalar_one_or_none()
452 if queue_item:
453 queue_item.resolved_by_log_id = log_entry.id
454 session.flush()
455 observe_moderation_queue_item_resolved(
456 queue_item.trigger, ModerationAction.UNFLAG, moderation_state.object_type
457 )
458 observe_moderation_queue_resolution_time(
459 queue_item.trigger,
460 ModerationAction.UNFLAG,
461 moderation_state.object_type,
462 (now() - queue_item.time_created).total_seconds(),
463 )
465 observe_moderation_action(ModerationAction.UNFLAG, moderation_state.object_type)
467 return moderation_pb2.UnflagContentRes(
468 moderation_state=moderation_state_to_pb(moderation_state, session),
469 )