Coverage for app/backend/src/couchers/servicers/moderation.py: 93%

263 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 09:29 +0000

1import logging 

2 

3import grpc 

4from sqlalchemy import and_, exists, not_, or_, select 

5from sqlalchemy.orm import Session 

6 

7from couchers.context import CouchersContext 

8from couchers.jobs.enqueue import queue_job 

9from couchers.metrics import ( 

10 observe_moderation_action, 

11 observe_moderation_queue_item_created, 

12 observe_moderation_queue_item_resolved, 

13 observe_moderation_queue_resolution_time, 

14 observe_moderation_visibility_transition, 

15) 

16from couchers.models import ( 

17 AdminActionLevel, 

18 Comment, 

19 Discussion, 

20 Event, 

21 EventOccurrence, 

22 FriendRelationship, 

23 GroupChat, 

24 HostRequest, 

25 Message, 

26 MessageType, 

27 ModerationAction, 

28 ModerationLog, 

29 ModerationObjectType, 

30 ModerationQueueItem, 

31 ModerationState, 

32 ModerationTrigger, 

33 ModerationVisibility, 

34 Notification, 

35 NotificationDelivery, 

36 Reference, 

37 Reply, 

38 User, 

39 get_moderated_models, 

40) 

41from couchers.proto import moderation_pb2, moderation_pb2_grpc 

42from couchers.proto.internal import jobs_pb2 

43from couchers.utils import Timestamp_from_datetime, now 

44 

45logger = logging.getLogger(__name__) 

46 

47MAX_PAGINATION_LENGTH = 1_000 

48 

49# Moderation enum mappings 

50moderationvisibility2api = { 

51 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED, 

52 ModerationVisibility.hidden: moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

53 ModerationVisibility.shadowed: moderation_pb2.MODERATION_VISIBILITY_SHADOWED, 

54 ModerationVisibility.visible: moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

55 ModerationVisibility.unlisted: moderation_pb2.MODERATION_VISIBILITY_UNLISTED, 

56} 

57 

58moderationvisibility2sql = { 

59 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None, 

60 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.hidden, 

61 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.shadowed, 

62 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.visible, 

63 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.unlisted, 

64} 

65 

66moderationtrigger2api = { 

67 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED, 

68 ModerationTrigger.initial_review: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW, 

69 ModerationTrigger.user_flag: moderation_pb2.MODERATION_TRIGGER_USER_FLAG, 

70 ModerationTrigger.machine_flag: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

71 ModerationTrigger.moderator_review: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW, 

72} 

73 

74moderationtrigger2sql = { 

75 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None, 

76 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.initial_review, 

77 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.user_flag, 

78 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.machine_flag, 

79 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.moderator_review, 

80} 

81 

82moderationaction2api = { 

83 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED, 

84 ModerationAction.create: moderation_pb2.MODERATION_ACTION_CREATE, 

85 ModerationAction.approve: moderation_pb2.MODERATION_ACTION_APPROVE, 

86 ModerationAction.hide: moderation_pb2.MODERATION_ACTION_HIDE, 

87 ModerationAction.flag: moderation_pb2.MODERATION_ACTION_FLAG, 

88 ModerationAction.unflag: moderation_pb2.MODERATION_ACTION_UNFLAG, 

89 ModerationAction.set_priority: moderation_pb2.MODERATION_ACTION_SET_PRIORITY, 

90 ModerationAction.bulk_set_visibility: moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY, 

91} 

92 

93moderationaction2sql = { 

94 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None, 

95 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.create, 

96 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.approve, 

97 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.hide, 

98 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.flag, 

99 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.unflag, 

100 moderation_pb2.MODERATION_ACTION_SET_PRIORITY: ModerationAction.set_priority, 

101 moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY: ModerationAction.bulk_set_visibility, 

102} 

103 

104moderationobjecttype2api = { 

105 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED, 

106 ModerationObjectType.host_request: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

107 ModerationObjectType.group_chat: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

108 ModerationObjectType.friend_request: moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST, 

109 ModerationObjectType.event_occurrence: moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE, 

110 ModerationObjectType.comment: moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT, 

111 ModerationObjectType.reply: moderation_pb2.MODERATION_OBJECT_TYPE_REPLY, 

112 ModerationObjectType.discussion: moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION, 

113 ModerationObjectType.reference: moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE, 

114} 

115 

116moderationobjecttype2sql = { 

117 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None, 

118 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.host_request, 

119 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.group_chat, 

120 moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST: ModerationObjectType.friend_request, 

121 moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE: ModerationObjectType.event_occurrence, 

122 moderation_pb2.MODERATION_OBJECT_TYPE_COMMENT: ModerationObjectType.comment, 

123 moderation_pb2.MODERATION_OBJECT_TYPE_REPLY: ModerationObjectType.reply, 

124 moderation_pb2.MODERATION_OBJECT_TYPE_DISCUSSION: ModerationObjectType.discussion, 

125 moderation_pb2.MODERATION_OBJECT_TYPE_REFERENCE: ModerationObjectType.reference, 

126} 

127 

128 

129def _resolve_queue_item( 

130 queue_item: ModerationQueueItem, 

131 log_entry: ModerationLog, 

132 action: ModerationAction, 

133 object_type: ModerationObjectType, 

134) -> None: 

135 """Resolve an open queue item against the given log entry and record metrics.""" 

136 queue_item.resolved_by_log_id = log_entry.id 

137 observe_moderation_queue_item_resolved(queue_item.trigger, action, object_type) 

138 observe_moderation_queue_resolution_time( 

139 queue_item.trigger, 

140 action, 

141 object_type, 

142 (now() - queue_item.time_created).total_seconds(), 

143 ) 

144 

145 

146def bulk_set_user_content_visibility( 

147 session: Session, 

148 user: User, 

149 new_visibility: ModerationVisibility, 

150 moderator_user_id: int, 

151 from_visibilities: set[ModerationVisibility] | None = None, 

152 reason: str | None = None, 

153) -> int: 

154 """Set visibility on every UMS-governed object authored by the user. Returns count of updated states.""" 

155 final_reason = reason or f"Bulk visibility update for user {user.id} to {new_visibility.name}" 

156 

157 author_exists_clauses = [] 

158 for entry in get_moderated_models().values(): 

159 author_exists_clauses.append( 

160 exists().where(and_(entry.moderation_state_id_column == ModerationState.id, entry.author_column == user.id)) 

161 ) 

162 

163 states = session.execute(select(ModerationState).where(or_(*author_exists_clauses))).scalars().all() 

164 

165 updated_count = 0 

166 for moderation_state in states: 

167 if from_visibilities and moderation_state.visibility not in from_visibilities: 

168 continue 

169 if moderation_state.visibility == new_visibility: 

170 continue 

171 

172 old_visibility = moderation_state.visibility 

173 moderation_state.visibility = new_visibility 

174 moderation_state.updated = now() 

175 

176 log_entry = ModerationLog( 

177 moderation_state_id=moderation_state.id, 

178 action=ModerationAction.bulk_set_visibility, 

179 moderator_user_id=moderator_user_id, 

180 new_visibility=new_visibility, 

181 reason=final_reason, 

182 ) 

183 session.add(log_entry) 

184 session.flush() 

185 

186 open_items = ( 

187 session.execute( 

188 select(ModerationQueueItem) 

189 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

190 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

191 ) 

192 .scalars() 

193 .all() 

194 ) 

195 for queue_item in open_items: 

196 _resolve_queue_item( 

197 queue_item, log_entry, ModerationAction.bulk_set_visibility, moderation_state.object_type 

198 ) 

199 session.flush() 

200 

201 observe_moderation_action(ModerationAction.bulk_set_visibility, moderation_state.object_type) 

202 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

203 

204 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted): 

205 _enqueue_pending_notifications(session, moderation_state.id) 

206 

207 updated_count += 1 

208 

209 return updated_count 

210 

211 

212def _enqueue_pending_notifications(session: Session, moderation_state_id: int) -> None: 

213 """Re-queue any pending notifications linked to the given moderation state whose deliveries were suppressed.""" 

214 pending_notifications = ( 

215 session.execute( 

216 select(Notification) 

217 .where(Notification.moderation_state_id == moderation_state_id) 

218 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id))) 

219 ) 

220 .scalars() 

221 .all() 

222 ) 

223 

224 # Import here to avoid circular dependency 

225 from couchers.notifications.background import handle_notification # noqa: PLC0415 

226 

227 for notification in pending_notifications: 

228 queue_job( 

229 session, 

230 job=handle_notification, 

231 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id), 

232 ) 

233 

234 

235def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo: 

236 """Convert ModerationState model to proto message""" 

237 object_type = state.object_type 

238 object_id = state.object_id 

239 

240 # Get the author user ID and content based on object type 

241 if object_type == ModerationObjectType.host_request: 

242 author_user_id = session.execute( 

243 select(HostRequest.initiator_user_id).where(HostRequest.conversation_id == object_id) 

244 ).scalar_one() 

245 # Get the first text message for this conversation 

246 content = session.execute( 

247 select(Message.text) 

248 .where(Message.conversation_id == object_id) 

249 .where(Message.message_type == MessageType.text) 

250 .order_by(Message.id.asc()) 

251 .limit(1) 

252 ).scalar_one_or_none() 

253 elif object_type == ModerationObjectType.group_chat: 

254 author_user_id = session.execute( 

255 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id) 

256 ).scalar_one() 

257 # Get the first text message for this conversation 

258 content = session.execute( 

259 select(Message.text) 

260 .where(Message.conversation_id == object_id) 

261 .where(Message.message_type == MessageType.text) 

262 .order_by(Message.id.asc()) 

263 .limit(1) 

264 ).scalar_one_or_none() 

265 elif object_type == ModerationObjectType.friend_request: 

266 author_user_id = session.execute( 

267 select(FriendRelationship.from_user_id).where(FriendRelationship.id == object_id) 

268 ).scalar_one() 

269 # Friend requests have no text content 

270 content = None 

271 elif object_type == ModerationObjectType.event_occurrence: 

272 author_user_id, title, description = session.execute( 

273 select(EventOccurrence.creator_user_id, Event.title, EventOccurrence.content) 

274 .join(Event, Event.id == EventOccurrence.event_id) 

275 .where(EventOccurrence.id == object_id) 

276 ).one() 

277 content = f"{title}\n\n{description}" 

278 elif object_type == ModerationObjectType.comment: 

279 author_user_id, content = session.execute( 

280 select(Comment.author_user_id, Comment.content).where(Comment.id == object_id) 

281 ).one() 

282 elif object_type == ModerationObjectType.reply: 

283 author_user_id, content = session.execute( 

284 select(Reply.author_user_id, Reply.content).where(Reply.id == object_id) 

285 ).one() 

286 elif object_type == ModerationObjectType.discussion: 

287 author_user_id, title, body = session.execute( 

288 select(Discussion.creator_user_id, Discussion.title, Discussion.content).where(Discussion.id == object_id) 

289 ).one() 

290 content = f"{title}\n\n{body}" 

291 elif object_type == ModerationObjectType.reference: 291 ↛ 296line 291 didn't jump to line 296 because the condition on line 291 was always true

292 author_user_id, content = session.execute( 

293 select(Reference.from_user_id, Reference.text).where(Reference.id == object_id) 

294 ).one() 

295 else: 

296 raise ValueError(f"Unsupported moderation object type: {object_type}") 

297 

298 # Import here to avoid circular dependency 

299 from couchers.servicers.admin import _user_to_details # noqa: PLC0415 

300 

301 author = session.execute(select(User).where(User.id == author_user_id)).scalar_one() 

302 

303 state_pb = moderation_pb2.ModerationStateInfo( 

304 moderation_state_id=state.id, 

305 object_type=moderationobjecttype2api[state.object_type], 

306 object_id=state.object_id, 

307 visibility=moderationvisibility2api[state.visibility], 

308 created=Timestamp_from_datetime(state.created), 

309 updated=Timestamp_from_datetime(state.updated), 

310 author_user_id=author_user_id, 

311 author=_user_to_details(session, author), 

312 content=content or "", 

313 ) 

314 

315 return state_pb 

316 

317 

318class Moderation(moderation_pb2_grpc.ModerationServicer): 

319 def GetModerationQueue( 

320 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session 

321 ) -> moderation_pb2.GetModerationQueueRes: 

322 """Get moderation queue items with optional filtering""" 

323 

324 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

325 

326 # Build query 

327 statement = select(ModerationQueueItem) 

328 

329 # Apply page token filter based on ordering direction 

330 if request.page_token: 

331 page_token_id = int(request.page_token) 

332 if request.newest_first: 332 ↛ 337line 332 didn't jump to line 337 because the condition on line 332 was always true

333 # Descending order: get items with smaller IDs 

334 statement = statement.where(ModerationQueueItem.id < page_token_id) 

335 else: 

336 # Ascending order: get items with larger IDs 

337 statement = statement.where(ModerationQueueItem.id > page_token_id) 

338 

339 # Apply filters 

340 if request.triggers: 

341 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers] 

342 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers)) 

343 

344 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 internal_object_type = moderationobjecttype2sql[request.object_type] 

346 if internal_object_type: 

347 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type) 

348 

349 if request.unresolved_only: 

350 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

351 

352 if request.HasField("priority_min"): 

353 statement = statement.where(ModerationQueueItem.priority >= request.priority_min) 

354 

355 if request.HasField("priority_max"): 

356 statement = statement.where(ModerationQueueItem.priority <= request.priority_max) 

357 

358 if request.HasField("created_before"): 

359 created_before = request.created_before.ToDatetime() 

360 statement = statement.where(ModerationQueueItem.time_created < created_before) 

361 

362 if request.HasField("created_after"): 

363 created_after = request.created_after.ToDatetime() 

364 statement = statement.where(ModerationQueueItem.time_created > created_after) 

365 

366 if request.item_author_user_id: 

367 author_user_id = request.item_author_user_id 

368 

369 # Use EXISTS for efficient author filtering 

370 author_exists_clauses = [] 

371 for entry in get_moderated_models().values(): 

372 author_exists_clauses.append( 

373 exists().where( 

374 and_( 

375 entry.moderation_state_id_column == ModerationQueueItem.moderation_state_id, 

376 entry.author_column == author_user_id, 

377 ) 

378 ) 

379 ) 

380 statement = statement.where(or_(*author_exists_clauses)) 

381 

382 # Order by time created 

383 if request.newest_first: 

384 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc()) 

385 else: 

386 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc()) 

387 

388 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all() 

389 

390 # Convert to proto 

391 queue_items_pb = [] 

392 for item in queue_items[:page_size]: 

393 # Fetch the moderation state for this queue item 

394 mod_state = session.execute( 

395 select(ModerationState).where(ModerationState.id == item.moderation_state_id) 

396 ).scalar_one() 

397 

398 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

399 queue_item_id=item.id, 

400 moderation_state_id=item.moderation_state_id, 

401 time_created=Timestamp_from_datetime(item.time_created), 

402 trigger=moderationtrigger2api[item.trigger], 

403 reason=item.reason, 

404 is_resolved=item.resolved_by_log_id is not None, 

405 resolved_by_log_id=item.resolved_by_log_id or 0, 

406 moderation_state=moderation_state_to_pb(mod_state, session), 

407 priority=item.priority, 

408 ) 

409 

410 queue_items_pb.append(queue_item_pb) 

411 

412 return moderation_pb2.GetModerationQueueRes( 

413 queue_items=queue_items_pb, 

414 # Use the ID of the last returned item (not the extra fetched item) as the cursor 

415 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None, 

416 ) 

417 

418 def GetModerationState( 

419 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session 

420 ) -> moderation_pb2.GetModerationStateRes: 

421 """Get moderation state by object type and object ID""" 

422 object_type = moderationobjecttype2sql[request.object_type] 

423 if object_type is None: 

424 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.") 

425 

426 moderation_state = session.execute( 

427 select(ModerationState) 

428 .where(ModerationState.object_type == object_type) 

429 .where(ModerationState.object_id == request.object_id) 

430 ).scalar_one_or_none() 

431 if moderation_state is None: 

432 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

433 

434 return moderation_pb2.GetModerationStateRes( 

435 moderation_state=moderation_state_to_pb(moderation_state, session), 

436 ) 

437 

438 def GetModerationLog( 

439 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session 

440 ) -> moderation_pb2.GetModerationLogRes: 

441 """Get moderation log for a specific moderation state""" 

442 # Get the moderation state 

443 moderation_state = session.execute( 

444 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

445 ).scalar_one_or_none() 

446 if moderation_state is None: 

447 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

448 

449 # Get all log entries for this state, ordered by time (most recent first) 

450 log_entries = ( 

451 session.execute( 

452 select(ModerationLog) 

453 .where(ModerationLog.moderation_state_id == request.moderation_state_id) 

454 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc()) 

455 ) 

456 .scalars() 

457 .all() 

458 ) 

459 

460 # Convert moderation state to proto first (while still in session) 

461 moderation_state_pb = moderation_state_to_pb(moderation_state, session) 

462 

463 # Convert to proto 

464 log_entries_pb = [] 

465 for entry in log_entries: 

466 log_entry_pb = moderation_pb2.ModerationLogEntryInfo( 

467 log_entry_id=entry.id, 

468 moderation_state_id=entry.moderation_state_id, 

469 time=Timestamp_from_datetime(entry.time), 

470 action=moderationaction2api[entry.action], 

471 moderator_user_id=entry.moderator_user_id, 

472 reason=entry.reason, 

473 ) 

474 

475 # Only include changed fields 

476 if entry.new_visibility is not None: 

477 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility] 

478 if entry.new_priority is not None: 478 ↛ 479line 478 didn't jump to line 479 because the condition on line 478 was never true

479 log_entry_pb.new_priority = entry.new_priority 

480 if entry.queue_item_id is not None: 

481 log_entry_pb.queue_item_id = entry.queue_item_id 

482 

483 log_entries_pb.append(log_entry_pb) 

484 

485 return moderation_pb2.GetModerationLogRes( 

486 log_entries=log_entries_pb, 

487 moderation_state=moderation_state_pb, 

488 ) 

489 

490 def ModerateContent( 

491 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session 

492 ) -> moderation_pb2.ModerateContentRes: 

493 """Single moderation entrypoint, dispatching on action. 

494 

495 APPROVE/HIDE act on the state's visibility; FLAG/SET_PRIORITY/UNFLAG act on a single 

496 queue item. Every action appends a ModerationLog row. 

497 """ 

498 

499 moderation_state = session.execute( 

500 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

501 ).scalar_one_or_none() 

502 if moderation_state is None: 

503 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

504 

505 action = moderationaction2sql[request.action] 

506 if action is None: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:action_must_be_specified") 

508 

509 reason = request.reason or "Moderated by admin" 

510 object_type = moderation_state.object_type 

511 

512 if action in (ModerationAction.approve, ModerationAction.hide): 

513 new_visibility = moderationvisibility2sql[request.visibility] 

514 if new_visibility is None: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true

515 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified") 

516 

517 old_visibility = moderation_state.visibility 

518 moderation_state.visibility = new_visibility 

519 moderation_state.updated = now() 

520 

521 log_entry = ModerationLog( 

522 moderation_state_id=moderation_state.id, 

523 action=action, 

524 moderator_user_id=context.user_id, 

525 new_visibility=new_visibility, 

526 reason=reason, 

527 ) 

528 session.add(log_entry) 

529 session.flush() 

530 

531 if request.clear_flags: 

532 open_items = ( 

533 session.execute( 

534 select(ModerationQueueItem) 

535 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

536 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

537 ) 

538 .scalars() 

539 .all() 

540 ) 

541 for queue_item in open_items: 

542 _resolve_queue_item(queue_item, log_entry, action, object_type) 

543 session.flush() 

544 

545 observe_moderation_action(action, object_type) 

546 observe_moderation_visibility_transition(old_visibility, new_visibility, object_type) 

547 

548 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted): 

549 _enqueue_pending_notifications(session, moderation_state.id) 

550 

551 elif action == ModerationAction.flag: 

552 trigger = moderationtrigger2sql[request.trigger] 

553 if trigger is None: 

554 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:trigger_must_be_specified") 

555 

556 queue_item = ModerationQueueItem( 

557 moderation_state_id=moderation_state.id, 

558 trigger=trigger, 

559 reason=reason, 

560 priority=request.priority, 

561 ) 

562 session.add(queue_item) 

563 session.flush() 

564 

565 log_entry = ModerationLog( 

566 moderation_state_id=moderation_state.id, 

567 action=ModerationAction.flag, 

568 moderator_user_id=context.user_id, 

569 queue_item_id=queue_item.id, 

570 reason=reason, 

571 ) 

572 session.add(log_entry) 

573 session.flush() 

574 

575 if request.supersede_queue_item_id: 

576 superseded = session.execute( 

577 select(ModerationQueueItem) 

578 .where(ModerationQueueItem.id == request.supersede_queue_item_id) 

579 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

580 ).scalar_one_or_none() 

581 if superseded is None: 581 ↛ 582line 581 didn't jump to line 582 because the condition on line 581 was never true

582 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:queue_item_not_found") 

583 if superseded.resolved_by_log_id is None: 583 ↛ 587line 583 didn't jump to line 587 because the condition on line 583 was always true

584 _resolve_queue_item(superseded, log_entry, ModerationAction.flag, object_type) 

585 session.flush() 

586 

587 observe_moderation_action(ModerationAction.flag, object_type) 

588 observe_moderation_queue_item_created(trigger, object_type) 

589 

590 elif action == ModerationAction.set_priority: 

591 queue_item = self._get_queue_item_for_state(request, context, session, moderation_state.id) 

592 queue_item.priority = request.priority 

593 

594 log_entry = ModerationLog( 

595 moderation_state_id=moderation_state.id, 

596 action=ModerationAction.set_priority, 

597 moderator_user_id=context.user_id, 

598 queue_item_id=queue_item.id, 

599 new_priority=request.priority, 

600 reason=reason, 

601 ) 

602 session.add(log_entry) 

603 session.flush() 

604 

605 observe_moderation_action(ModerationAction.set_priority, object_type) 

606 

607 elif action == ModerationAction.unflag: 607 ↛ 628line 607 didn't jump to line 628 because the condition on line 607 was always true

608 queue_item = self._get_queue_item_for_state(request, context, session, moderation_state.id) 

609 moderation_state.updated = now() 

610 

611 log_entry = ModerationLog( 

612 moderation_state_id=moderation_state.id, 

613 action=ModerationAction.unflag, 

614 moderator_user_id=context.user_id, 

615 queue_item_id=queue_item.id, 

616 reason=reason, 

617 ) 

618 session.add(log_entry) 

619 session.flush() 

620 

621 if queue_item.resolved_by_log_id is None: 621 ↛ 625line 621 didn't jump to line 625 because the condition on line 621 was always true

622 _resolve_queue_item(queue_item, log_entry, ModerationAction.unflag, object_type) 

623 session.flush() 

624 

625 observe_moderation_action(ModerationAction.unflag, object_type) 

626 

627 else: 

628 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:unsupported_action") 

629 

630 return moderation_pb2.ModerateContentRes( 

631 moderation_state=moderation_state_to_pb(moderation_state, session), 

632 ) 

633 

634 def _get_queue_item_for_state( 

635 self, 

636 request: moderation_pb2.ModerateContentReq, 

637 context: CouchersContext, 

638 session: Session, 

639 moderation_state_id: int, 

640 ) -> ModerationQueueItem: 

641 """Resolve the request's target queue item, asserting it belongs to the given state.""" 

642 if not request.queue_item_id: 

643 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:queue_item_id_must_be_specified") 

644 queue_item = session.execute( 

645 select(ModerationQueueItem) 

646 .where(ModerationQueueItem.id == request.queue_item_id) 

647 .where(ModerationQueueItem.moderation_state_id == moderation_state_id) 

648 ).scalar_one_or_none() 

649 if queue_item is None: 649 ↛ 650line 649 didn't jump to line 650 because the condition on line 649 was never true

650 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "admin:queue_item_not_found") 

651 return queue_item 

652 

653 def SetUserContentVisibility( 

654 self, request: moderation_pb2.SetUserContentVisibilityReq, context: CouchersContext, session: Session 

655 ) -> moderation_pb2.SetUserContentVisibilityRes: 

656 """Bulk-set visibility on every UMS-governed object authored by the given user. 

657 

658 If from_visibility is non-empty, only states currently at one of those visibilities are swept. 

659 """ 

660 new_visibility = moderationvisibility2sql[request.visibility] 

661 if new_visibility is None: 

662 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified") 

663 

664 raw_from_visibilities = {moderationvisibility2sql.get(v) for v in request.from_visibility} 

665 if None in raw_from_visibilities: 

666 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "admin:visibility_must_be_specified") 

667 from_visibilities: set[ModerationVisibility] | None = { 

668 v for v in raw_from_visibilities if v is not None 

669 } or None 

670 

671 user = session.execute(select(User).where(User.id == request.user_id)).scalar_one_or_none() 

672 if not user: 

673 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

674 

675 updated_count = bulk_set_user_content_visibility( 

676 session=session, 

677 user=user, 

678 new_visibility=new_visibility, 

679 moderator_user_id=context.user_id, 

680 from_visibilities=from_visibilities, 

681 reason=request.reason or None, 

682 ) 

683 

684 # Import here to avoid circular dependency 

685 from couchers.servicers.admin import log_admin_action # noqa: PLC0415 

686 

687 log_admin_action( 

688 session, 

689 context, 

690 user, 

691 "set_user_content_visibility", 

692 note=request.reason or None, 

693 tag=new_visibility.name, 

694 level=AdminActionLevel.high, 

695 ) 

696 

697 return moderation_pb2.SetUserContentVisibilityRes(updated_count=updated_count) 

698 

699 def ListModerationStates( 

700 self, request: moderation_pb2.ListModerationStatesReq, context: CouchersContext, session: Session 

701 ) -> moderation_pb2.ListModerationStatesRes: 

702 """Chronological, paginated list of ModerationState rows. Optional author_user_id filter.""" 

703 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

704 

705 statement = select(ModerationState) 

706 

707 if request.page_token: 

708 page_token_id = int(request.page_token) 

709 if request.newest_first: 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true

710 statement = statement.where(ModerationState.id < page_token_id) 

711 else: 

712 statement = statement.where(ModerationState.id > page_token_id) 

713 

714 if request.author_user_id: 

715 author_exists_clauses = [] 

716 for entry in get_moderated_models().values(): 

717 author_exists_clauses.append( 

718 exists().where( 

719 and_( 

720 entry.moderation_state_id_column == ModerationState.id, 

721 entry.author_column == request.author_user_id, 

722 ) 

723 ) 

724 ) 

725 statement = statement.where(or_(*author_exists_clauses)) 

726 

727 if request.newest_first: 

728 statement = statement.order_by(ModerationState.created.desc(), ModerationState.id.desc()) 

729 else: 

730 statement = statement.order_by(ModerationState.created.asc(), ModerationState.id.asc()) 

731 

732 states = session.execute(statement.limit(page_size + 1)).scalars().all() 

733 

734 state_pbs = [moderation_state_to_pb(state, session) for state in states[:page_size]] 

735 

736 return moderation_pb2.ListModerationStatesRes( 

737 moderation_states=state_pbs, 

738 next_page_token=str(states[page_size - 1].id) if len(states) > page_size else None, 

739 )