Coverage for src/couchers/servicers/moderation.py: 84%

151 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import logging 

2 

3import grpc 

4from sqlalchemy import and_, exists, not_, or_ 

5 

6from couchers.jobs.enqueue import queue_job 

7from couchers.metrics import ( 

8 observe_moderation_action, 

9 observe_moderation_queue_item_created, 

10 observe_moderation_queue_item_resolved, 

11 observe_moderation_queue_resolution_time, 

12 observe_moderation_visibility_transition, 

13) 

14from couchers.models import ( 

15 GroupChat, 

16 HostRequest, 

17 Message, 

18 MessageType, 

19 ModerationAction, 

20 ModerationLog, 

21 ModerationObjectType, 

22 ModerationQueueItem, 

23 ModerationState, 

24 ModerationTrigger, 

25 ModerationVisibility, 

26 Notification, 

27 NotificationDelivery, 

28) 

29from couchers.proto import moderation_pb2, moderation_pb2_grpc 

30from couchers.proto.internal import jobs_pb2 

31from couchers.sql import couchers_select as select 

32from couchers.utils import Timestamp_from_datetime, now 

33 

34logger = logging.getLogger(__name__) 

35 

36MAX_PAGINATION_LENGTH = 1_000 

37 

38# Moderation enum mappings 

39moderationvisibility2api = { 

40 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED, 

41 ModerationVisibility.HIDDEN: moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

42 ModerationVisibility.SHADOWED: moderation_pb2.MODERATION_VISIBILITY_SHADOWED, 

43 ModerationVisibility.VISIBLE: moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

44 ModerationVisibility.UNLISTED: moderation_pb2.MODERATION_VISIBILITY_UNLISTED, 

45} 

46 

47moderationvisibility2sql = { 

48 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None, 

49 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.HIDDEN, 

50 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.SHADOWED, 

51 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.VISIBLE, 

52 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.UNLISTED, 

53} 

54 

55moderationtrigger2api = { 

56 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED, 

57 ModerationTrigger.INITIAL_REVIEW: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW, 

58 ModerationTrigger.USER_FLAG: moderation_pb2.MODERATION_TRIGGER_USER_FLAG, 

59 ModerationTrigger.MACHINE_FLAG: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

60 ModerationTrigger.MODERATOR_REVIEW: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW, 

61} 

62 

63moderationtrigger2sql = { 

64 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None, 

65 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.INITIAL_REVIEW, 

66 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.USER_FLAG, 

67 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.MACHINE_FLAG, 

68 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.MODERATOR_REVIEW, 

69} 

70 

71moderationaction2api = { 

72 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED, 

73 ModerationAction.CREATE: moderation_pb2.MODERATION_ACTION_CREATE, 

74 ModerationAction.APPROVE: moderation_pb2.MODERATION_ACTION_APPROVE, 

75 ModerationAction.HIDE: moderation_pb2.MODERATION_ACTION_HIDE, 

76 ModerationAction.FLAG: moderation_pb2.MODERATION_ACTION_FLAG, 

77 ModerationAction.UNFLAG: moderation_pb2.MODERATION_ACTION_UNFLAG, 

78} 

79 

80moderationaction2sql = { 

81 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None, 

82 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.CREATE, 

83 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.APPROVE, 

84 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.HIDE, 

85 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.FLAG, 

86 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.UNFLAG, 

87} 

88 

89moderationobjecttype2api = { 

90 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED, 

91 ModerationObjectType.HOST_REQUEST: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

92 ModerationObjectType.GROUP_CHAT: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

93} 

94 

95moderationobjecttype2sql = { 

96 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None, 

97 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.HOST_REQUEST, 

98 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.GROUP_CHAT, 

99} 

100 

101 

102def moderation_state_to_pb(state: ModerationState, session): 

103 """Convert ModerationState model to proto message""" 

104 object_type = state.object_type 

105 object_id = state.object_id 

106 

107 # Get the author user ID 

108 if object_type == ModerationObjectType.HOST_REQUEST: 

109 author_user_id = session.execute( 

110 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id) 

111 ).scalar_one() 

112 elif object_type == ModerationObjectType.GROUP_CHAT: 

113 author_user_id = session.execute( 

114 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id) 

115 ).scalar_one() 

116 else: 

117 raise ValueError(f"Unsupported moderation object type: {object_type}") 

118 

119 # Get the first text message for this conversation 

120 content = session.execute( 

121 select(Message.text) 

122 .where(Message.conversation_id == object_id) 

123 .where(Message.message_type == MessageType.text) 

124 .order_by(Message.id.asc()) 

125 .limit(1) 

126 ).scalar_one_or_none() 

127 

128 state_pb = moderation_pb2.ModerationStateInfo( 

129 moderation_state_id=state.id, 

130 object_type=moderationobjecttype2api[state.object_type], 

131 object_id=state.object_id, 

132 visibility=moderationvisibility2api[state.visibility], 

133 created=Timestamp_from_datetime(state.created), 

134 updated=Timestamp_from_datetime(state.updated), 

135 author_user_id=author_user_id, 

136 content=content or "", 

137 ) 

138 

139 return state_pb 

140 

141 

142class Moderation(moderation_pb2_grpc.ModerationServicer): 

143 def GetModerationQueue(self, request, context, session): 

144 """Get moderation queue items with optional filtering""" 

145 

146 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

147 

148 # Build query 

149 statement = select(ModerationQueueItem) 

150 

151 # Apply page token filter based on ordering direction 

152 if request.page_token: 

153 page_token_id = int(request.page_token) 

154 if request.newest_first: 

155 # Descending order: get items with smaller IDs 

156 statement = statement.where(ModerationQueueItem.id < page_token_id) 

157 else: 

158 # Ascending order: get items with larger IDs 

159 statement = statement.where(ModerationQueueItem.id > page_token_id) 

160 

161 # Apply filters 

162 if request.triggers: 

163 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers] 

164 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers)) 

165 

166 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 

167 internal_object_type = moderationobjecttype2sql[request.object_type] 

168 if internal_object_type: 

169 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type) 

170 

171 if request.unresolved_only: 

172 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

173 

174 if request.HasField("created_before"): 

175 created_before = request.created_before.ToDatetime() 

176 statement = statement.where(ModerationQueueItem.time_created < created_before) 

177 

178 if request.HasField("created_after"): 

179 created_after = request.created_after.ToDatetime() 

180 statement = statement.where(ModerationQueueItem.time_created > created_after) 

181 

182 if request.item_author_user_id: 

183 author_user_id = request.item_author_user_id 

184 

185 # Use EXISTS for efficient author filtering 

186 hr_exists = exists().where( 

187 and_( 

188 HostRequest.moderation_state_id == ModerationQueueItem.moderation_state_id, 

189 HostRequest.surfer_user_id == author_user_id, 

190 ) 

191 ) 

192 gc_exists = exists().where( 

193 and_( 

194 GroupChat.moderation_state_id == ModerationQueueItem.moderation_state_id, 

195 GroupChat.creator_id == author_user_id, 

196 ) 

197 ) 

198 statement = statement.where(or_(hr_exists, gc_exists)) 

199 

200 # Order by time created 

201 if request.newest_first: 

202 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc()) 

203 else: 

204 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc()) 

205 

206 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all() 

207 

208 # Convert to proto 

209 queue_items_pb = [] 

210 for item in queue_items[:page_size]: 

211 # Fetch the moderation state for this queue item 

212 mod_state = session.execute( 

213 select(ModerationState).where(ModerationState.id == item.moderation_state_id) 

214 ).scalar_one() 

215 

216 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

217 queue_item_id=item.id, 

218 moderation_state_id=item.moderation_state_id, 

219 time_created=Timestamp_from_datetime(item.time_created), 

220 trigger=moderationtrigger2api[item.trigger], 

221 reason=item.reason, 

222 is_resolved=item.resolved_by_log_id is not None, 

223 resolved_by_log_id=item.resolved_by_log_id or 0, 

224 moderation_state=moderation_state_to_pb(mod_state, session), 

225 ) 

226 

227 queue_items_pb.append(queue_item_pb) 

228 

229 return moderation_pb2.GetModerationQueueRes( 

230 queue_items=queue_items_pb, 

231 # Use the ID of the last returned item (not the extra fetched item) as the cursor 

232 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None, 

233 ) 

234 

235 def GetModerationState(self, request, context, session): 

236 """Get moderation state by object type and object ID""" 

237 object_type = moderationobjecttype2sql[request.object_type] 

238 if object_type is None: 

239 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.") 

240 

241 moderation_state = session.execute( 

242 select(ModerationState) 

243 .where(ModerationState.object_type == object_type) 

244 .where(ModerationState.object_id == request.object_id) 

245 ).scalar_one_or_none() 

246 if moderation_state is None: 

247 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

248 

249 return moderation_pb2.GetModerationStateRes( 

250 moderation_state=moderation_state_to_pb(moderation_state, session), 

251 ) 

252 

253 def GetModerationLog(self, request, context, session): 

254 """Get moderation log for a specific moderation state""" 

255 # Get the moderation state 

256 moderation_state = session.execute( 

257 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

258 ).scalar_one_or_none() 

259 if moderation_state is None: 

260 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

261 

262 # Get all log entries for this state, ordered by time (most recent first) 

263 log_entries = ( 

264 session.execute( 

265 select(ModerationLog) 

266 .where(ModerationLog.moderation_state_id == request.moderation_state_id) 

267 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc()) 

268 ) 

269 .scalars() 

270 .all() 

271 ) 

272 

273 # Convert moderation state to proto first (while still in session) 

274 moderation_state_pb = moderation_state_to_pb(moderation_state, session) 

275 

276 # Convert to proto 

277 log_entries_pb = [] 

278 for entry in log_entries: 

279 log_entry_pb = moderation_pb2.ModerationLogEntryInfo( 

280 log_entry_id=entry.id, 

281 moderation_state_id=entry.moderation_state_id, 

282 time=Timestamp_from_datetime(entry.time), 

283 action=moderationaction2api[entry.action], 

284 moderator_user_id=entry.moderator_user_id, 

285 reason=entry.reason, 

286 ) 

287 

288 # Only include changed fields 

289 if entry.new_visibility is not None: 

290 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility] 

291 

292 log_entries_pb.append(log_entry_pb) 

293 

294 return moderation_pb2.GetModerationLogRes( 

295 log_entries=log_entries_pb, 

296 moderation_state=moderation_state_pb, 

297 ) 

298 

299 def ModerateContent(self, request, context, session): 

300 """Unified moderation action - takes both action and visibility explicitly""" 

301 

302 moderation_state = session.execute( 

303 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

304 ).scalar_one_or_none() 

305 if moderation_state is None: 

306 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

307 

308 # Convert proto enums to internal enums 

309 action = moderationaction2sql[request.action] 

310 if action is None: 

311 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified") 

312 

313 new_visibility = moderationvisibility2sql[request.visibility] 

314 if new_visibility is None: 

315 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified") 

316 

317 reason = request.reason or "Moderated by admin" 

318 

319 # Track old visibility for metrics 

320 old_visibility = moderation_state.visibility 

321 

322 # Update visibility 

323 moderation_state.visibility = new_visibility 

324 moderation_state.updated = now() 

325 

326 # Log the action 

327 log_entry = ModerationLog( 

328 moderation_state_id=moderation_state.id, 

329 action=action, 

330 moderator_user_id=context.user_id, 

331 new_visibility=new_visibility, 

332 reason=reason, 

333 ) 

334 session.add(log_entry) 

335 session.flush() 

336 

337 # Resolve any pending queue items 

338 queue_item = session.execute( 

339 select(ModerationQueueItem) 

340 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

341 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

342 .order_by(ModerationQueueItem.time_created.desc()) 

343 ).scalar_one_or_none() 

344 

345 if queue_item: 

346 queue_item.resolved_by_log_id = log_entry.id 

347 session.flush() 

348 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type) 

349 observe_moderation_queue_resolution_time( 

350 queue_item.trigger, 

351 action, 

352 moderation_state.object_type, 

353 (now() - queue_item.time_created).total_seconds(), 

354 ) 

355 

356 observe_moderation_action(action, moderation_state.object_type) 

357 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

358 

359 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications 

360 if new_visibility in (ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED): 

361 pending_notifications = ( 

362 session.execute( 

363 select(Notification) 

364 .where(Notification.moderation_state_id == moderation_state.id) 

365 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id))) 

366 ) 

367 .scalars() 

368 .all() 

369 ) 

370 

371 for notification in pending_notifications: 

372 queue_job( 

373 session, 

374 job_type="handle_notification", 

375 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id), 

376 ) 

377 

378 return moderation_pb2.ModerateContentRes( 

379 moderation_state=moderation_state_to_pb(moderation_state, session), 

380 ) 

381 

382 def FlagContentForReview(self, request, context, session): 

383 """Flag content for review by adding it to the moderation queue""" 

384 

385 moderation_state = session.execute( 

386 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

387 ).scalar_one_or_none() 

388 if not moderation_state: 

389 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

390 

391 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.INITIAL_REVIEW 

392 reason = request.reason or "Flagged by admin for review" 

393 

394 # Add to moderation queue 

395 queue_item = ModerationQueueItem( 

396 moderation_state_id=request.moderation_state_id, 

397 trigger=trigger, 

398 reason=reason, 

399 ) 

400 session.add(queue_item) 

401 session.flush() 

402 

403 observe_moderation_action(ModerationAction.FLAG, moderation_state.object_type) 

404 observe_moderation_queue_item_created(trigger, moderation_state.object_type) 

405 

406 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

407 queue_item_id=queue_item.id, 

408 moderation_state_id=queue_item.moderation_state_id, 

409 time_created=Timestamp_from_datetime(queue_item.time_created), 

410 trigger=moderationtrigger2api[queue_item.trigger], 

411 reason=queue_item.reason, 

412 is_resolved=False, 

413 resolved_by_log_id=0, 

414 moderation_state=moderation_state_to_pb(moderation_state, session), 

415 ) 

416 

417 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb) 

418 

419 def UnflagContent(self, request, context, session): 

420 """Unflag content by resolving pending queue items""" 

421 

422 moderation_state = session.execute( 

423 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

424 ).scalar_one_or_none() 

425 if not moderation_state: 

426 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

427 

428 reason = request.reason or "Unflagged by admin" 

429 

430 # Update moderation state (inline moderate_content logic) 

431 moderation_state.updated = now() 

432 

433 # Log the unflag action 

434 log_entry = ModerationLog( 

435 moderation_state_id=moderation_state.id, 

436 action=ModerationAction.UNFLAG, 

437 moderator_user_id=context.user_id, 

438 new_visibility=None, 

439 reason=reason, 

440 ) 

441 session.add(log_entry) 

442 session.flush() 

443 

444 # Resolve any pending queue items (inline resolve_queue_item logic) 

445 queue_item = session.execute( 

446 select(ModerationQueueItem) 

447 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

448 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

449 .order_by(ModerationQueueItem.time_created.desc()) 

450 ).scalar_one_or_none() 

451 

452 if queue_item: 

453 queue_item.resolved_by_log_id = log_entry.id 

454 session.flush() 

455 observe_moderation_queue_item_resolved( 

456 queue_item.trigger, ModerationAction.UNFLAG, moderation_state.object_type 

457 ) 

458 observe_moderation_queue_resolution_time( 

459 queue_item.trigger, 

460 ModerationAction.UNFLAG, 

461 moderation_state.object_type, 

462 (now() - queue_item.time_created).total_seconds(), 

463 ) 

464 

465 observe_moderation_action(ModerationAction.UNFLAG, moderation_state.object_type) 

466 

467 return moderation_pb2.UnflagContentRes( 

468 moderation_state=moderation_state_to_pb(moderation_state, session), 

469 )