Coverage for app / backend / src / couchers / servicers / moderation.py: 82%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-03 06:18 +0000

1import logging 

2 

3import grpc 

4from sqlalchemy import and_, exists, not_, or_, select 

5from sqlalchemy.orm import Session 

6 

7from couchers.context import CouchersContext 

8from couchers.jobs.enqueue import queue_job 

9from couchers.metrics import ( 

10 observe_moderation_action, 

11 observe_moderation_queue_item_created, 

12 observe_moderation_queue_item_resolved, 

13 observe_moderation_queue_resolution_time, 

14 observe_moderation_visibility_transition, 

15) 

16from couchers.models import ( 

17 GroupChat, 

18 HostRequest, 

19 Message, 

20 MessageType, 

21 ModerationAction, 

22 ModerationLog, 

23 ModerationObjectType, 

24 ModerationQueueItem, 

25 ModerationState, 

26 ModerationTrigger, 

27 ModerationVisibility, 

28 Notification, 

29 NotificationDelivery, 

30) 

31from couchers.proto import moderation_pb2, moderation_pb2_grpc 

32from couchers.proto.internal import jobs_pb2 

33from couchers.utils import Timestamp_from_datetime, now 

34 

35logger = logging.getLogger(__name__) 

36 

37MAX_PAGINATION_LENGTH = 1_000 

38 

39# Moderation enum mappings 

40moderationvisibility2api = { 

41 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED, 

42 ModerationVisibility.HIDDEN: moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

43 ModerationVisibility.SHADOWED: moderation_pb2.MODERATION_VISIBILITY_SHADOWED, 

44 ModerationVisibility.VISIBLE: moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

45 ModerationVisibility.UNLISTED: moderation_pb2.MODERATION_VISIBILITY_UNLISTED, 

46} 

47 

48moderationvisibility2sql = { 

49 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None, 

50 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.HIDDEN, 

51 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.SHADOWED, 

52 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.VISIBLE, 

53 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.UNLISTED, 

54} 

55 

56moderationtrigger2api = { 

57 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED, 

58 ModerationTrigger.INITIAL_REVIEW: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW, 

59 ModerationTrigger.USER_FLAG: moderation_pb2.MODERATION_TRIGGER_USER_FLAG, 

60 ModerationTrigger.MACHINE_FLAG: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

61 ModerationTrigger.MODERATOR_REVIEW: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW, 

62} 

63 

64moderationtrigger2sql = { 

65 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None, 

66 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.INITIAL_REVIEW, 

67 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.USER_FLAG, 

68 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.MACHINE_FLAG, 

69 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.MODERATOR_REVIEW, 

70} 

71 

72moderationaction2api = { 

73 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED, 

74 ModerationAction.CREATE: moderation_pb2.MODERATION_ACTION_CREATE, 

75 ModerationAction.APPROVE: moderation_pb2.MODERATION_ACTION_APPROVE, 

76 ModerationAction.HIDE: moderation_pb2.MODERATION_ACTION_HIDE, 

77 ModerationAction.FLAG: moderation_pb2.MODERATION_ACTION_FLAG, 

78 ModerationAction.UNFLAG: moderation_pb2.MODERATION_ACTION_UNFLAG, 

79} 

80 

81moderationaction2sql = { 

82 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None, 

83 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.CREATE, 

84 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.APPROVE, 

85 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.HIDE, 

86 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.FLAG, 

87 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.UNFLAG, 

88} 

89 

90moderationobjecttype2api = { 

91 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED, 

92 ModerationObjectType.HOST_REQUEST: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

93 ModerationObjectType.GROUP_CHAT: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

94} 

95 

96moderationobjecttype2sql = { 

97 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None, 

98 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.HOST_REQUEST, 

99 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.GROUP_CHAT, 

100} 

101 

102 

103def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo: 

104 """Convert ModerationState model to proto message""" 

105 object_type = state.object_type 

106 object_id = state.object_id 

107 

108 # Get the author user ID 

109 if object_type == ModerationObjectType.HOST_REQUEST: 

110 author_user_id = session.execute( 

111 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id) 

112 ).scalar_one() 

113 elif object_type == ModerationObjectType.GROUP_CHAT: 113 ↛ 118line 113 didn't jump to line 118 because the condition on line 113 was always true

114 author_user_id = session.execute( 

115 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id) 

116 ).scalar_one() 

117 else: 

118 raise ValueError(f"Unsupported moderation object type: {object_type}") 

119 

120 # Get the first text message for this conversation 

121 content = session.execute( 

122 select(Message.text) 

123 .where(Message.conversation_id == object_id) 

124 .where(Message.message_type == MessageType.text) 

125 .order_by(Message.id.asc()) 

126 .limit(1) 

127 ).scalar_one_or_none() 

128 

129 state_pb = moderation_pb2.ModerationStateInfo( 

130 moderation_state_id=state.id, 

131 object_type=moderationobjecttype2api[state.object_type], 

132 object_id=state.object_id, 

133 visibility=moderationvisibility2api[state.visibility], 

134 created=Timestamp_from_datetime(state.created), 

135 updated=Timestamp_from_datetime(state.updated), 

136 author_user_id=author_user_id, 

137 content=content or "", 

138 ) 

139 

140 return state_pb 

141 

142 

143class Moderation(moderation_pb2_grpc.ModerationServicer): 

144 def GetModerationQueue( 

145 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session 

146 ) -> moderation_pb2.GetModerationQueueRes: 

147 """Get moderation queue items with optional filtering""" 

148 

149 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

150 

151 # Build query 

152 statement = select(ModerationQueueItem) 

153 

154 # Apply page token filter based on ordering direction 

155 if request.page_token: 

156 page_token_id = int(request.page_token) 

157 if request.newest_first: 157 ↛ 162line 157 didn't jump to line 162 because the condition on line 157 was always true

158 # Descending order: get items with smaller IDs 

159 statement = statement.where(ModerationQueueItem.id < page_token_id) 

160 else: 

161 # Ascending order: get items with larger IDs 

162 statement = statement.where(ModerationQueueItem.id > page_token_id) 

163 

164 # Apply filters 

165 if request.triggers: 

166 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers] 

167 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers)) 

168 

169 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 internal_object_type = moderationobjecttype2sql[request.object_type] 

171 if internal_object_type: 

172 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type) 

173 

174 if request.unresolved_only: 

175 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

176 

177 if request.HasField("created_before"): 

178 created_before = request.created_before.ToDatetime() 

179 statement = statement.where(ModerationQueueItem.time_created < created_before) 

180 

181 if request.HasField("created_after"): 

182 created_after = request.created_after.ToDatetime() 

183 statement = statement.where(ModerationQueueItem.time_created > created_after) 

184 

185 if request.item_author_user_id: 

186 author_user_id = request.item_author_user_id 

187 

188 # Use EXISTS for efficient author filtering 

189 hr_exists = exists().where( 

190 and_( 

191 HostRequest.moderation_state_id == ModerationQueueItem.moderation_state_id, 

192 HostRequest.surfer_user_id == author_user_id, 

193 ) 

194 ) 

195 gc_exists = exists().where( 

196 and_( 

197 GroupChat.moderation_state_id == ModerationQueueItem.moderation_state_id, 

198 GroupChat.creator_id == author_user_id, 

199 ) 

200 ) 

201 statement = statement.where(or_(hr_exists, gc_exists)) 

202 

203 # Order by time created 

204 if request.newest_first: 

205 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc()) 

206 else: 

207 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc()) 

208 

209 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all() 

210 

211 # Convert to proto 

212 queue_items_pb = [] 

213 for item in queue_items[:page_size]: 

214 # Fetch the moderation state for this queue item 

215 mod_state = session.execute( 

216 select(ModerationState).where(ModerationState.id == item.moderation_state_id) 

217 ).scalar_one() 

218 

219 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

220 queue_item_id=item.id, 

221 moderation_state_id=item.moderation_state_id, 

222 time_created=Timestamp_from_datetime(item.time_created), 

223 trigger=moderationtrigger2api[item.trigger], 

224 reason=item.reason, 

225 is_resolved=item.resolved_by_log_id is not None, 

226 resolved_by_log_id=item.resolved_by_log_id or 0, 

227 moderation_state=moderation_state_to_pb(mod_state, session), 

228 ) 

229 

230 queue_items_pb.append(queue_item_pb) 

231 

232 return moderation_pb2.GetModerationQueueRes( 

233 queue_items=queue_items_pb, 

234 # Use the ID of the last returned item (not the extra fetched item) as the cursor 

235 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None, 

236 ) 

237 

238 def GetModerationState( 

239 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session 

240 ) -> moderation_pb2.GetModerationStateRes: 

241 """Get moderation state by object type and object ID""" 

242 object_type = moderationobjecttype2sql[request.object_type] 

243 if object_type is None: 

244 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.") 

245 

246 moderation_state = session.execute( 

247 select(ModerationState) 

248 .where(ModerationState.object_type == object_type) 

249 .where(ModerationState.object_id == request.object_id) 

250 ).scalar_one_or_none() 

251 if moderation_state is None: 

252 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

253 

254 return moderation_pb2.GetModerationStateRes( 

255 moderation_state=moderation_state_to_pb(moderation_state, session), 

256 ) 

257 

258 def GetModerationLog( 

259 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session 

260 ) -> moderation_pb2.GetModerationLogRes: 

261 """Get moderation log for a specific moderation state""" 

262 # Get the moderation state 

263 moderation_state = session.execute( 

264 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

265 ).scalar_one_or_none() 

266 if moderation_state is None: 

267 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

268 

269 # Get all log entries for this state, ordered by time (most recent first) 

270 log_entries = ( 

271 session.execute( 

272 select(ModerationLog) 

273 .where(ModerationLog.moderation_state_id == request.moderation_state_id) 

274 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc()) 

275 ) 

276 .scalars() 

277 .all() 

278 ) 

279 

280 # Convert moderation state to proto first (while still in session) 

281 moderation_state_pb = moderation_state_to_pb(moderation_state, session) 

282 

283 # Convert to proto 

284 log_entries_pb = [] 

285 for entry in log_entries: 

286 log_entry_pb = moderation_pb2.ModerationLogEntryInfo( 

287 log_entry_id=entry.id, 

288 moderation_state_id=entry.moderation_state_id, 

289 time=Timestamp_from_datetime(entry.time), 

290 action=moderationaction2api[entry.action], 

291 moderator_user_id=entry.moderator_user_id, 

292 reason=entry.reason, 

293 ) 

294 

295 # Only include changed fields 

296 if entry.new_visibility is not None: 296 ↛ 299line 296 didn't jump to line 299 because the condition on line 296 was always true

297 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility] 

298 

299 log_entries_pb.append(log_entry_pb) 

300 

301 return moderation_pb2.GetModerationLogRes( 

302 log_entries=log_entries_pb, 

303 moderation_state=moderation_state_pb, 

304 ) 

305 

306 def ModerateContent( 

307 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session 

308 ) -> moderation_pb2.ModerateContentRes: 

309 """Unified moderation action - takes both action and visibility explicitly""" 

310 

311 moderation_state = session.execute( 

312 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

313 ).scalar_one_or_none() 

314 if moderation_state is None: 

315 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

316 

317 # Convert proto enums to internal enums 

318 action = moderationaction2sql[request.action] 

319 if action is None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified") 

321 

322 new_visibility = moderationvisibility2sql[request.visibility] 

323 if new_visibility is None: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true

324 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified") 

325 

326 reason = request.reason or "Moderated by admin" 

327 

328 # Track old visibility for metrics 

329 old_visibility = moderation_state.visibility 

330 

331 # Update visibility 

332 moderation_state.visibility = new_visibility 

333 moderation_state.updated = now() 

334 

335 # Log the action 

336 log_entry = ModerationLog( 

337 moderation_state_id=moderation_state.id, 

338 action=action, 

339 moderator_user_id=context.user_id, 

340 new_visibility=new_visibility, 

341 reason=reason, 

342 ) 

343 session.add(log_entry) 

344 session.flush() 

345 

346 # Resolve any pending queue items 

347 queue_item = session.execute( 

348 select(ModerationQueueItem) 

349 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

350 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

351 .order_by(ModerationQueueItem.time_created.desc()) 

352 ).scalar_one_or_none() 

353 

354 if queue_item: 

355 queue_item.resolved_by_log_id = log_entry.id 

356 session.flush() 

357 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type) 

358 observe_moderation_queue_resolution_time( 

359 queue_item.trigger, 

360 action, 

361 moderation_state.object_type, 

362 (now() - queue_item.time_created).total_seconds(), 

363 ) 

364 

365 observe_moderation_action(action, moderation_state.object_type) 

366 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

367 

368 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications 

369 if new_visibility in (ModerationVisibility.VISIBLE, ModerationVisibility.UNLISTED): 

370 pending_notifications = ( 

371 session.execute( 

372 select(Notification) 

373 .where(Notification.moderation_state_id == moderation_state.id) 

374 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id))) 

375 ) 

376 .scalars() 

377 .all() 

378 ) 

379 

380 # Import here to avoid circular dependency 

381 from couchers.notifications.background import handle_notification 

382 

383 for notification in pending_notifications: 

384 queue_job( 

385 session, 

386 job=handle_notification, 

387 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id), 

388 ) 

389 

390 return moderation_pb2.ModerateContentRes( 

391 moderation_state=moderation_state_to_pb(moderation_state, session), 

392 ) 

393 

394 def FlagContentForReview( 

395 self, request: moderation_pb2.FlagContentForReviewReq, context: CouchersContext, session: Session 

396 ) -> moderation_pb2.FlagContentForReviewRes: 

397 """Flag content for review by adding it to the moderation queue""" 

398 

399 moderation_state = session.execute( 

400 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

401 ).scalar_one_or_none() 

402 if not moderation_state: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true

403 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

404 

405 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.INITIAL_REVIEW 

406 reason = request.reason or "Flagged by admin for review" 

407 

408 # Add to moderation queue 

409 queue_item = ModerationQueueItem( 

410 moderation_state_id=request.moderation_state_id, 

411 trigger=trigger, 

412 reason=reason, 

413 ) 

414 session.add(queue_item) 

415 session.flush() 

416 

417 observe_moderation_action(ModerationAction.FLAG, moderation_state.object_type) 

418 observe_moderation_queue_item_created(trigger, moderation_state.object_type) 

419 

420 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

421 queue_item_id=queue_item.id, 

422 moderation_state_id=queue_item.moderation_state_id, 

423 time_created=Timestamp_from_datetime(queue_item.time_created), 

424 trigger=moderationtrigger2api[queue_item.trigger], 

425 reason=queue_item.reason, 

426 is_resolved=False, 

427 resolved_by_log_id=0, 

428 moderation_state=moderation_state_to_pb(moderation_state, session), 

429 ) 

430 

431 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb) 

432 

433 def UnflagContent( 

434 self, request: moderation_pb2.UnflagContentReq, context: CouchersContext, session: Session 

435 ) -> moderation_pb2.UnflagContentRes: 

436 """Unflag content by resolving pending queue items""" 

437 

438 moderation_state = session.execute( 

439 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

440 ).scalar_one_or_none() 

441 if not moderation_state: 

442 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

443 

444 reason = request.reason or "Unflagged by admin" 

445 

446 # Update moderation state (inline moderate_content logic) 

447 moderation_state.updated = now() 

448 

449 # Log the unflag action 

450 log_entry = ModerationLog( 

451 moderation_state_id=moderation_state.id, 

452 action=ModerationAction.UNFLAG, 

453 moderator_user_id=context.user_id, 

454 new_visibility=None, 

455 reason=reason, 

456 ) 

457 session.add(log_entry) 

458 session.flush() 

459 

460 # Resolve any pending queue items (inline resolve_queue_item logic) 

461 queue_item = session.execute( 

462 select(ModerationQueueItem) 

463 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

464 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

465 .order_by(ModerationQueueItem.time_created.desc()) 

466 ).scalar_one_or_none() 

467 

468 if queue_item: 

469 queue_item.resolved_by_log_id = log_entry.id 

470 session.flush() 

471 observe_moderation_queue_item_resolved( 

472 queue_item.trigger, ModerationAction.UNFLAG, moderation_state.object_type 

473 ) 

474 observe_moderation_queue_resolution_time( 

475 queue_item.trigger, 

476 ModerationAction.UNFLAG, 

477 moderation_state.object_type, 

478 (now() - queue_item.time_created).total_seconds(), 

479 ) 

480 

481 observe_moderation_action(ModerationAction.UNFLAG, moderation_state.object_type) 

482 

483 return moderation_pb2.UnflagContentRes( 

484 moderation_state=moderation_state_to_pb(moderation_state, session), 

485 )