Coverage for app / backend / src / couchers / servicers / moderation.py: 83%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1import logging 

2from typing import TYPE_CHECKING 

3 

4import grpc 

5from sqlalchemy import and_, exists, not_, or_, select 

6from sqlalchemy.orm import Session 

7 

8from couchers.context import CouchersContext 

9from couchers.jobs.enqueue import queue_job 

10from couchers.metrics import ( 

11 observe_moderation_action, 

12 observe_moderation_queue_item_created, 

13 observe_moderation_queue_item_resolved, 

14 observe_moderation_queue_resolution_time, 

15 observe_moderation_visibility_transition, 

16) 

17from couchers.models import ( 

18 Event, 

19 EventOccurrence, 

20 FriendRelationship, 

21 GroupChat, 

22 HostRequest, 

23 Message, 

24 MessageType, 

25 ModerationAction, 

26 ModerationLog, 

27 ModerationObjectType, 

28 ModerationQueueItem, 

29 ModerationState, 

30 ModerationTrigger, 

31 ModerationVisibility, 

32 Notification, 

33 NotificationDelivery, 

34) 

35from couchers.proto import moderation_pb2, moderation_pb2_grpc 

36from couchers.proto.internal import jobs_pb2 

37from couchers.utils import Timestamp_from_datetime, now 

38 

39if TYPE_CHECKING: 

40 from couchers.sql import _ModeratedContent 

41 

42logger = logging.getLogger(__name__) 

43 

44MAX_PAGINATION_LENGTH = 1_000 

45 

46# Moderation enum mappings 

47moderationvisibility2api = { 

48 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED, 

49 ModerationVisibility.hidden: moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

50 ModerationVisibility.shadowed: moderation_pb2.MODERATION_VISIBILITY_SHADOWED, 

51 ModerationVisibility.visible: moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

52 ModerationVisibility.unlisted: moderation_pb2.MODERATION_VISIBILITY_UNLISTED, 

53} 

54 

55moderationvisibility2sql = { 

56 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None, 

57 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.hidden, 

58 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.shadowed, 

59 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.visible, 

60 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.unlisted, 

61} 

62 

63moderationtrigger2api = { 

64 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED, 

65 ModerationTrigger.initial_review: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW, 

66 ModerationTrigger.user_flag: moderation_pb2.MODERATION_TRIGGER_USER_FLAG, 

67 ModerationTrigger.machine_flag: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

68 ModerationTrigger.moderator_review: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW, 

69} 

70 

71moderationtrigger2sql = { 

72 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None, 

73 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.initial_review, 

74 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.user_flag, 

75 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.machine_flag, 

76 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.moderator_review, 

77} 

78 

79moderationaction2api = { 

80 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED, 

81 ModerationAction.create: moderation_pb2.MODERATION_ACTION_CREATE, 

82 ModerationAction.approve: moderation_pb2.MODERATION_ACTION_APPROVE, 

83 ModerationAction.hide: moderation_pb2.MODERATION_ACTION_HIDE, 

84 ModerationAction.flag: moderation_pb2.MODERATION_ACTION_FLAG, 

85 ModerationAction.unflag: moderation_pb2.MODERATION_ACTION_UNFLAG, 

86} 

87 

88moderationaction2sql = { 

89 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None, 

90 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.create, 

91 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.approve, 

92 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.hide, 

93 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.flag, 

94 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.unflag, 

95} 

96 

97moderationobjecttype2api = { 

98 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED, 

99 ModerationObjectType.host_request: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

100 ModerationObjectType.group_chat: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

101 ModerationObjectType.friend_request: moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST, 

102 ModerationObjectType.event_occurrence: moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE, 

103} 

104 

105moderationobjecttype2sql = { 

106 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None, 

107 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.host_request, 

108 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.group_chat, 

109 moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST: ModerationObjectType.friend_request, 

110 moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE: ModerationObjectType.event_occurrence, 

111} 

112 

113# Mapping from ModerationObjectType to the SQLAlchemy model class 

114moderationobjecttype2model: dict[ModerationObjectType, _ModeratedContent] = { 

115 ModerationObjectType.host_request: HostRequest, 

116 ModerationObjectType.group_chat: GroupChat, 

117 ModerationObjectType.friend_request: FriendRelationship, 

118 ModerationObjectType.event_occurrence: EventOccurrence, 

119} 

120 

121 

122def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo: 

123 """Convert ModerationState model to proto message""" 

124 object_type = state.object_type 

125 object_id = state.object_id 

126 

127 # Get the author user ID and content based on object type 

128 if object_type == ModerationObjectType.host_request: 

129 author_user_id = session.execute( 

130 select(HostRequest.surfer_user_id).where(HostRequest.conversation_id == object_id) 

131 ).scalar_one() 

132 # Get the first text message for this conversation 

133 content = session.execute( 

134 select(Message.text) 

135 .where(Message.conversation_id == object_id) 

136 .where(Message.message_type == MessageType.text) 

137 .order_by(Message.id.asc()) 

138 .limit(1) 

139 ).scalar_one_or_none() 

140 elif object_type == ModerationObjectType.group_chat: 

141 author_user_id = session.execute( 

142 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id) 

143 ).scalar_one() 

144 # Get the first text message for this conversation 

145 content = session.execute( 

146 select(Message.text) 

147 .where(Message.conversation_id == object_id) 

148 .where(Message.message_type == MessageType.text) 

149 .order_by(Message.id.asc()) 

150 .limit(1) 

151 ).scalar_one_or_none() 

152 elif object_type == ModerationObjectType.friend_request: 

153 author_user_id = session.execute( 

154 select(FriendRelationship.from_user_id).where(FriendRelationship.id == object_id) 

155 ).scalar_one() 

156 # Friend requests have no text content 

157 content = None 

158 elif object_type == ModerationObjectType.event_occurrence: 158 ↛ 166line 158 didn't jump to line 166 because the condition on line 158 was always true

159 author_user_id, title, description = session.execute( 

160 select(EventOccurrence.creator_user_id, Event.title, EventOccurrence.content) 

161 .join(Event, Event.id == EventOccurrence.event_id) 

162 .where(EventOccurrence.id == object_id) 

163 ).one() 

164 content = f"{title}\n\n{description}" 

165 else: 

166 raise ValueError(f"Unsupported moderation object type: {object_type}") 

167 

168 state_pb = moderation_pb2.ModerationStateInfo( 

169 moderation_state_id=state.id, 

170 object_type=moderationobjecttype2api[state.object_type], 

171 object_id=state.object_id, 

172 visibility=moderationvisibility2api[state.visibility], 

173 created=Timestamp_from_datetime(state.created), 

174 updated=Timestamp_from_datetime(state.updated), 

175 author_user_id=author_user_id, 

176 content=content or "", 

177 ) 

178 

179 return state_pb 

180 

181 

182class Moderation(moderation_pb2_grpc.ModerationServicer): 

183 def GetModerationQueue( 

184 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session 

185 ) -> moderation_pb2.GetModerationQueueRes: 

186 """Get moderation queue items with optional filtering""" 

187 

188 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

189 

190 # Build query 

191 statement = select(ModerationQueueItem) 

192 

193 # Apply page token filter based on ordering direction 

194 if request.page_token: 

195 page_token_id = int(request.page_token) 

196 if request.newest_first: 196 ↛ 201line 196 didn't jump to line 201 because the condition on line 196 was always true

197 # Descending order: get items with smaller IDs 

198 statement = statement.where(ModerationQueueItem.id < page_token_id) 

199 else: 

200 # Ascending order: get items with larger IDs 

201 statement = statement.where(ModerationQueueItem.id > page_token_id) 

202 

203 # Apply filters 

204 if request.triggers: 

205 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers] 

206 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers)) 

207 

208 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 internal_object_type = moderationobjecttype2sql[request.object_type] 

210 if internal_object_type: 

211 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type) 

212 

213 if request.unresolved_only: 

214 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

215 

216 if request.HasField("created_before"): 

217 created_before = request.created_before.ToDatetime() 

218 statement = statement.where(ModerationQueueItem.time_created < created_before) 

219 

220 if request.HasField("created_after"): 

221 created_after = request.created_after.ToDatetime() 

222 statement = statement.where(ModerationQueueItem.time_created > created_after) 

223 

224 if request.item_author_user_id: 

225 author_user_id = request.item_author_user_id 

226 

227 # Use EXISTS for efficient author filtering 

228 author_exists_clauses = [] 

229 for model in moderationobjecttype2model.values(): 

230 author_col = getattr(model, model.__moderation_author_column__) 

231 author_exists_clauses.append( 

232 exists().where( 

233 and_( 

234 model.moderation_state_id == ModerationQueueItem.moderation_state_id, 

235 author_col == author_user_id, 

236 ) 

237 ) 

238 ) 

239 statement = statement.where(or_(*author_exists_clauses)) 

240 

241 # Order by time created 

242 if request.newest_first: 

243 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc()) 

244 else: 

245 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc()) 

246 

247 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all() 

248 

249 # Convert to proto 

250 queue_items_pb = [] 

251 for item in queue_items[:page_size]: 

252 # Fetch the moderation state for this queue item 

253 mod_state = session.execute( 

254 select(ModerationState).where(ModerationState.id == item.moderation_state_id) 

255 ).scalar_one() 

256 

257 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

258 queue_item_id=item.id, 

259 moderation_state_id=item.moderation_state_id, 

260 time_created=Timestamp_from_datetime(item.time_created), 

261 trigger=moderationtrigger2api[item.trigger], 

262 reason=item.reason, 

263 is_resolved=item.resolved_by_log_id is not None, 

264 resolved_by_log_id=item.resolved_by_log_id or 0, 

265 moderation_state=moderation_state_to_pb(mod_state, session), 

266 ) 

267 

268 queue_items_pb.append(queue_item_pb) 

269 

270 return moderation_pb2.GetModerationQueueRes( 

271 queue_items=queue_items_pb, 

272 # Use the ID of the last returned item (not the extra fetched item) as the cursor 

273 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None, 

274 ) 

275 

276 def GetModerationState( 

277 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session 

278 ) -> moderation_pb2.GetModerationStateRes: 

279 """Get moderation state by object type and object ID""" 

280 object_type = moderationobjecttype2sql[request.object_type] 

281 if object_type is None: 

282 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.") 

283 

284 moderation_state = session.execute( 

285 select(ModerationState) 

286 .where(ModerationState.object_type == object_type) 

287 .where(ModerationState.object_id == request.object_id) 

288 ).scalar_one_or_none() 

289 if moderation_state is None: 

290 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

291 

292 return moderation_pb2.GetModerationStateRes( 

293 moderation_state=moderation_state_to_pb(moderation_state, session), 

294 ) 

295 

296 def GetModerationLog( 

297 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session 

298 ) -> moderation_pb2.GetModerationLogRes: 

299 """Get moderation log for a specific moderation state""" 

300 # Get the moderation state 

301 moderation_state = session.execute( 

302 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

303 ).scalar_one_or_none() 

304 if moderation_state is None: 

305 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

306 

307 # Get all log entries for this state, ordered by time (most recent first) 

308 log_entries = ( 

309 session.execute( 

310 select(ModerationLog) 

311 .where(ModerationLog.moderation_state_id == request.moderation_state_id) 

312 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc()) 

313 ) 

314 .scalars() 

315 .all() 

316 ) 

317 

318 # Convert moderation state to proto first (while still in session) 

319 moderation_state_pb = moderation_state_to_pb(moderation_state, session) 

320 

321 # Convert to proto 

322 log_entries_pb = [] 

323 for entry in log_entries: 

324 log_entry_pb = moderation_pb2.ModerationLogEntryInfo( 

325 log_entry_id=entry.id, 

326 moderation_state_id=entry.moderation_state_id, 

327 time=Timestamp_from_datetime(entry.time), 

328 action=moderationaction2api[entry.action], 

329 moderator_user_id=entry.moderator_user_id, 

330 reason=entry.reason, 

331 ) 

332 

333 # Only include changed fields 

334 if entry.new_visibility is not None: 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was always true

335 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility] 

336 

337 log_entries_pb.append(log_entry_pb) 

338 

339 return moderation_pb2.GetModerationLogRes( 

340 log_entries=log_entries_pb, 

341 moderation_state=moderation_state_pb, 

342 ) 

343 

344 def ModerateContent( 

345 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session 

346 ) -> moderation_pb2.ModerateContentRes: 

347 """Unified moderation action - takes both action and visibility explicitly""" 

348 

349 moderation_state = session.execute( 

350 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

351 ).scalar_one_or_none() 

352 if moderation_state is None: 

353 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

354 

355 # Convert proto enums to internal enums 

356 action = moderationaction2sql[request.action] 

357 if action is None: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified") 

359 

360 new_visibility = moderationvisibility2sql[request.visibility] 

361 if new_visibility is None: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified") 

363 

364 reason = request.reason or "Moderated by admin" 

365 

366 # Track old visibility for metrics 

367 old_visibility = moderation_state.visibility 

368 

369 # Update visibility 

370 moderation_state.visibility = new_visibility 

371 moderation_state.updated = now() 

372 

373 # Log the action 

374 log_entry = ModerationLog( 

375 moderation_state_id=moderation_state.id, 

376 action=action, 

377 moderator_user_id=context.user_id, 

378 new_visibility=new_visibility, 

379 reason=reason, 

380 ) 

381 session.add(log_entry) 

382 session.flush() 

383 

384 # Resolve any pending queue items 

385 queue_item = session.execute( 

386 select(ModerationQueueItem) 

387 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

388 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

389 .order_by(ModerationQueueItem.time_created.desc()) 

390 ).scalar_one_or_none() 

391 

392 if queue_item: 

393 queue_item.resolved_by_log_id = log_entry.id 

394 session.flush() 

395 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type) 

396 observe_moderation_queue_resolution_time( 

397 queue_item.trigger, 

398 action, 

399 moderation_state.object_type, 

400 (now() - queue_item.time_created).total_seconds(), 

401 ) 

402 

403 observe_moderation_action(action, moderation_state.object_type) 

404 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

405 

406 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications 

407 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted): 

408 pending_notifications = ( 

409 session.execute( 

410 select(Notification) 

411 .where(Notification.moderation_state_id == moderation_state.id) 

412 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id))) 

413 ) 

414 .scalars() 

415 .all() 

416 ) 

417 

418 # Import here to avoid circular dependency 

419 from couchers.notifications.background import handle_notification 

420 

421 for notification in pending_notifications: 

422 queue_job( 

423 session, 

424 job=handle_notification, 

425 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id), 

426 ) 

427 

428 return moderation_pb2.ModerateContentRes( 

429 moderation_state=moderation_state_to_pb(moderation_state, session), 

430 ) 

431 

432 def FlagContentForReview( 

433 self, request: moderation_pb2.FlagContentForReviewReq, context: CouchersContext, session: Session 

434 ) -> moderation_pb2.FlagContentForReviewRes: 

435 """Flag content for review by adding it to the moderation queue""" 

436 

437 moderation_state = session.execute( 

438 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

439 ).scalar_one_or_none() 

440 if not moderation_state: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

442 

443 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.initial_review 

444 reason = request.reason or "Flagged by admin for review" 

445 

446 # Add to moderation queue 

447 queue_item = ModerationQueueItem( 

448 moderation_state_id=request.moderation_state_id, 

449 trigger=trigger, 

450 reason=reason, 

451 ) 

452 session.add(queue_item) 

453 session.flush() 

454 

455 observe_moderation_action(ModerationAction.flag, moderation_state.object_type) 

456 observe_moderation_queue_item_created(trigger, moderation_state.object_type) 

457 

458 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

459 queue_item_id=queue_item.id, 

460 moderation_state_id=queue_item.moderation_state_id, 

461 time_created=Timestamp_from_datetime(queue_item.time_created), 

462 trigger=moderationtrigger2api[queue_item.trigger], 

463 reason=queue_item.reason, 

464 is_resolved=False, 

465 resolved_by_log_id=0, 

466 moderation_state=moderation_state_to_pb(moderation_state, session), 

467 ) 

468 

469 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb) 

470 

471 def UnflagContent( 

472 self, request: moderation_pb2.UnflagContentReq, context: CouchersContext, session: Session 

473 ) -> moderation_pb2.UnflagContentRes: 

474 """Unflag content by resolving pending queue items""" 

475 

476 moderation_state = session.execute( 

477 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

478 ).scalar_one_or_none() 

479 if not moderation_state: 

480 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

481 

482 reason = request.reason or "Unflagged by admin" 

483 

484 # Update moderation state (inline moderate_content logic) 

485 moderation_state.updated = now() 

486 

487 # Log the unflag action 

488 log_entry = ModerationLog( 

489 moderation_state_id=moderation_state.id, 

490 action=ModerationAction.unflag, 

491 moderator_user_id=context.user_id, 

492 new_visibility=None, 

493 reason=reason, 

494 ) 

495 session.add(log_entry) 

496 session.flush() 

497 

498 # Resolve any pending queue items (inline resolve_queue_item logic) 

499 queue_item = session.execute( 

500 select(ModerationQueueItem) 

501 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

502 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

503 .order_by(ModerationQueueItem.time_created.desc()) 

504 ).scalar_one_or_none() 

505 

506 if queue_item: 

507 queue_item.resolved_by_log_id = log_entry.id 

508 session.flush() 

509 observe_moderation_queue_item_resolved( 

510 queue_item.trigger, ModerationAction.unflag, moderation_state.object_type 

511 ) 

512 observe_moderation_queue_resolution_time( 

513 queue_item.trigger, 

514 ModerationAction.unflag, 

515 moderation_state.object_type, 

516 (now() - queue_item.time_created).total_seconds(), 

517 ) 

518 

519 observe_moderation_action(ModerationAction.unflag, moderation_state.object_type) 

520 

521 return moderation_pb2.UnflagContentRes( 

522 moderation_state=moderation_state_to_pb(moderation_state, session), 

523 )