Coverage for app / backend / src / couchers / servicers / moderation.py: 87%

201 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-05 09:44 +0000

1import logging 

2from typing import TYPE_CHECKING 

3 

4import grpc 

5from sqlalchemy import and_, exists, not_, or_, select 

6from sqlalchemy.orm import Session 

7 

8from couchers.context import CouchersContext 

9from couchers.jobs.enqueue import queue_job 

10from couchers.metrics import ( 

11 observe_moderation_action, 

12 observe_moderation_queue_item_created, 

13 observe_moderation_queue_item_resolved, 

14 observe_moderation_queue_resolution_time, 

15 observe_moderation_visibility_transition, 

16) 

17from couchers.models import ( 

18 AdminActionLevel, 

19 Event, 

20 EventOccurrence, 

21 FriendRelationship, 

22 GroupChat, 

23 HostRequest, 

24 Message, 

25 MessageType, 

26 ModerationAction, 

27 ModerationLog, 

28 ModerationObjectType, 

29 ModerationQueueItem, 

30 ModerationState, 

31 ModerationTrigger, 

32 ModerationVisibility, 

33 Notification, 

34 NotificationDelivery, 

35 User, 

36) 

37from couchers.proto import moderation_pb2, moderation_pb2_grpc 

38from couchers.proto.internal import jobs_pb2 

39from couchers.utils import Timestamp_from_datetime, now 

40 

41if TYPE_CHECKING: 

42 from couchers.sql import _ModeratedContent 

43 

44logger = logging.getLogger(__name__) 

45 

46MAX_PAGINATION_LENGTH = 1_000 

47 

48# Moderation enum mappings 

49moderationvisibility2api = { 

50 None: moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED, 

51 ModerationVisibility.hidden: moderation_pb2.MODERATION_VISIBILITY_HIDDEN, 

52 ModerationVisibility.shadowed: moderation_pb2.MODERATION_VISIBILITY_SHADOWED, 

53 ModerationVisibility.visible: moderation_pb2.MODERATION_VISIBILITY_VISIBLE, 

54 ModerationVisibility.unlisted: moderation_pb2.MODERATION_VISIBILITY_UNLISTED, 

55} 

56 

57moderationvisibility2sql = { 

58 moderation_pb2.MODERATION_VISIBILITY_UNSPECIFIED: None, 

59 moderation_pb2.MODERATION_VISIBILITY_HIDDEN: ModerationVisibility.hidden, 

60 moderation_pb2.MODERATION_VISIBILITY_SHADOWED: ModerationVisibility.shadowed, 

61 moderation_pb2.MODERATION_VISIBILITY_VISIBLE: ModerationVisibility.visible, 

62 moderation_pb2.MODERATION_VISIBILITY_UNLISTED: ModerationVisibility.unlisted, 

63} 

64 

65moderationtrigger2api = { 

66 None: moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED, 

67 ModerationTrigger.initial_review: moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW, 

68 ModerationTrigger.user_flag: moderation_pb2.MODERATION_TRIGGER_USER_FLAG, 

69 ModerationTrigger.machine_flag: moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG, 

70 ModerationTrigger.moderator_review: moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW, 

71} 

72 

73moderationtrigger2sql = { 

74 moderation_pb2.MODERATION_TRIGGER_UNSPECIFIED: None, 

75 moderation_pb2.MODERATION_TRIGGER_INITIAL_REVIEW: ModerationTrigger.initial_review, 

76 moderation_pb2.MODERATION_TRIGGER_USER_FLAG: ModerationTrigger.user_flag, 

77 moderation_pb2.MODERATION_TRIGGER_MACHINE_FLAG: ModerationTrigger.machine_flag, 

78 moderation_pb2.MODERATION_TRIGGER_MODERATOR_REVIEW: ModerationTrigger.moderator_review, 

79} 

80 

81moderationaction2api = { 

82 None: moderation_pb2.MODERATION_ACTION_UNSPECIFIED, 

83 ModerationAction.create: moderation_pb2.MODERATION_ACTION_CREATE, 

84 ModerationAction.approve: moderation_pb2.MODERATION_ACTION_APPROVE, 

85 ModerationAction.hide: moderation_pb2.MODERATION_ACTION_HIDE, 

86 ModerationAction.flag: moderation_pb2.MODERATION_ACTION_FLAG, 

87 ModerationAction.unflag: moderation_pb2.MODERATION_ACTION_UNFLAG, 

88 ModerationAction.bulk_set_visibility: moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY, 

89} 

90 

91moderationaction2sql = { 

92 moderation_pb2.MODERATION_ACTION_UNSPECIFIED: None, 

93 moderation_pb2.MODERATION_ACTION_CREATE: ModerationAction.create, 

94 moderation_pb2.MODERATION_ACTION_APPROVE: ModerationAction.approve, 

95 moderation_pb2.MODERATION_ACTION_HIDE: ModerationAction.hide, 

96 moderation_pb2.MODERATION_ACTION_FLAG: ModerationAction.flag, 

97 moderation_pb2.MODERATION_ACTION_UNFLAG: ModerationAction.unflag, 

98 moderation_pb2.MODERATION_ACTION_BULK_SET_VISIBILITY: ModerationAction.bulk_set_visibility, 

99} 

100 

101moderationobjecttype2api = { 

102 None: moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED, 

103 ModerationObjectType.host_request: moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST, 

104 ModerationObjectType.group_chat: moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT, 

105 ModerationObjectType.friend_request: moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST, 

106 ModerationObjectType.event_occurrence: moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE, 

107} 

108 

109moderationobjecttype2sql = { 

110 moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: None, 

111 moderation_pb2.MODERATION_OBJECT_TYPE_HOST_REQUEST: ModerationObjectType.host_request, 

112 moderation_pb2.MODERATION_OBJECT_TYPE_GROUP_CHAT: ModerationObjectType.group_chat, 

113 moderation_pb2.MODERATION_OBJECT_TYPE_FRIEND_REQUEST: ModerationObjectType.friend_request, 

114 moderation_pb2.MODERATION_OBJECT_TYPE_EVENT_OCCURRENCE: ModerationObjectType.event_occurrence, 

115} 

116 

117# Mapping from ModerationObjectType to the SQLAlchemy model class 

118moderationobjecttype2model: dict[ModerationObjectType, _ModeratedContent] = { 

119 ModerationObjectType.host_request: HostRequest, 

120 ModerationObjectType.group_chat: GroupChat, 

121 ModerationObjectType.friend_request: FriendRelationship, 

122 ModerationObjectType.event_occurrence: EventOccurrence, 

123} 

124 

125 

126def _enqueue_pending_notifications(session: Session, moderation_state_id: int) -> None: 

127 """Re-queue any pending notifications linked to the given moderation state whose deliveries were suppressed.""" 

128 pending_notifications = ( 

129 session.execute( 

130 select(Notification) 

131 .where(Notification.moderation_state_id == moderation_state_id) 

132 .where(not_(exists().where(NotificationDelivery.notification_id == Notification.id))) 

133 ) 

134 .scalars() 

135 .all() 

136 ) 

137 

138 # Import here to avoid circular dependency 

139 from couchers.notifications.background import handle_notification # noqa: PLC0415 

140 

141 for notification in pending_notifications: 

142 queue_job( 

143 session, 

144 job=handle_notification, 

145 payload=jobs_pb2.HandleNotificationPayload(notification_id=notification.id), 

146 ) 

147 

148 

149def moderation_state_to_pb(state: ModerationState, session: Session) -> moderation_pb2.ModerationStateInfo: 

150 """Convert ModerationState model to proto message""" 

151 object_type = state.object_type 

152 object_id = state.object_id 

153 

154 # Get the author user ID and content based on object type 

155 if object_type == ModerationObjectType.host_request: 

156 author_user_id = session.execute( 

157 select(HostRequest.initiator_user_id).where(HostRequest.conversation_id == object_id) 

158 ).scalar_one() 

159 # Get the first text message for this conversation 

160 content = session.execute( 

161 select(Message.text) 

162 .where(Message.conversation_id == object_id) 

163 .where(Message.message_type == MessageType.text) 

164 .order_by(Message.id.asc()) 

165 .limit(1) 

166 ).scalar_one_or_none() 

167 elif object_type == ModerationObjectType.group_chat: 

168 author_user_id = session.execute( 

169 select(GroupChat.creator_id).where(GroupChat.conversation_id == object_id) 

170 ).scalar_one() 

171 # Get the first text message for this conversation 

172 content = session.execute( 

173 select(Message.text) 

174 .where(Message.conversation_id == object_id) 

175 .where(Message.message_type == MessageType.text) 

176 .order_by(Message.id.asc()) 

177 .limit(1) 

178 ).scalar_one_or_none() 

179 elif object_type == ModerationObjectType.friend_request: 

180 author_user_id = session.execute( 

181 select(FriendRelationship.from_user_id).where(FriendRelationship.id == object_id) 

182 ).scalar_one() 

183 # Friend requests have no text content 

184 content = None 

185 elif object_type == ModerationObjectType.event_occurrence: 185 ↛ 193line 185 didn't jump to line 193 because the condition on line 185 was always true

186 author_user_id, title, description = session.execute( 

187 select(EventOccurrence.creator_user_id, Event.title, EventOccurrence.content) 

188 .join(Event, Event.id == EventOccurrence.event_id) 

189 .where(EventOccurrence.id == object_id) 

190 ).one() 

191 content = f"{title}\n\n{description}" 

192 else: 

193 raise ValueError(f"Unsupported moderation object type: {object_type}") 

194 

195 state_pb = moderation_pb2.ModerationStateInfo( 

196 moderation_state_id=state.id, 

197 object_type=moderationobjecttype2api[state.object_type], 

198 object_id=state.object_id, 

199 visibility=moderationvisibility2api[state.visibility], 

200 created=Timestamp_from_datetime(state.created), 

201 updated=Timestamp_from_datetime(state.updated), 

202 author_user_id=author_user_id, 

203 content=content or "", 

204 ) 

205 

206 return state_pb 

207 

208 

209class Moderation(moderation_pb2_grpc.ModerationServicer): 

210 def GetModerationQueue( 

211 self, request: moderation_pb2.GetModerationQueueReq, context: CouchersContext, session: Session 

212 ) -> moderation_pb2.GetModerationQueueRes: 

213 """Get moderation queue items with optional filtering""" 

214 

215 page_size = min(MAX_PAGINATION_LENGTH, request.page_size or MAX_PAGINATION_LENGTH) 

216 

217 # Build query 

218 statement = select(ModerationQueueItem) 

219 

220 # Apply page token filter based on ordering direction 

221 if request.page_token: 

222 page_token_id = int(request.page_token) 

223 if request.newest_first: 223 ↛ 228line 223 didn't jump to line 228 because the condition on line 223 was always true

224 # Descending order: get items with smaller IDs 

225 statement = statement.where(ModerationQueueItem.id < page_token_id) 

226 else: 

227 # Ascending order: get items with larger IDs 

228 statement = statement.where(ModerationQueueItem.id > page_token_id) 

229 

230 # Apply filters 

231 if request.triggers: 

232 internal_triggers = [moderationtrigger2sql[t] for t in request.triggers] 

233 statement = statement.where(ModerationQueueItem.trigger.in_(internal_triggers)) 

234 

235 if request.object_type and request.object_type != moderation_pb2.MODERATION_OBJECT_TYPE_UNSPECIFIED: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 internal_object_type = moderationobjecttype2sql[request.object_type] 

237 if internal_object_type: 

238 statement = statement.join(ModerationState).where(ModerationState.object_type == internal_object_type) 

239 

240 if request.unresolved_only: 

241 statement = statement.where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

242 

243 if request.HasField("created_before"): 

244 created_before = request.created_before.ToDatetime() 

245 statement = statement.where(ModerationQueueItem.time_created < created_before) 

246 

247 if request.HasField("created_after"): 

248 created_after = request.created_after.ToDatetime() 

249 statement = statement.where(ModerationQueueItem.time_created > created_after) 

250 

251 if request.item_author_user_id: 

252 author_user_id = request.item_author_user_id 

253 

254 # Use EXISTS for efficient author filtering 

255 author_exists_clauses = [] 

256 for model in moderationobjecttype2model.values(): 

257 author_col = getattr(model, model.__moderation_author_column__) 

258 author_exists_clauses.append( 

259 exists().where( 

260 and_( 

261 model.moderation_state_id == ModerationQueueItem.moderation_state_id, 

262 author_col == author_user_id, 

263 ) 

264 ) 

265 ) 

266 statement = statement.where(or_(*author_exists_clauses)) 

267 

268 # Order by time created 

269 if request.newest_first: 

270 statement = statement.order_by(ModerationQueueItem.time_created.desc(), ModerationQueueItem.id.desc()) 

271 else: 

272 statement = statement.order_by(ModerationQueueItem.time_created.asc(), ModerationQueueItem.id.asc()) 

273 

274 queue_items = session.execute(statement.limit(page_size + 1)).scalars().all() 

275 

276 # Convert to proto 

277 queue_items_pb = [] 

278 for item in queue_items[:page_size]: 

279 # Fetch the moderation state for this queue item 

280 mod_state = session.execute( 

281 select(ModerationState).where(ModerationState.id == item.moderation_state_id) 

282 ).scalar_one() 

283 

284 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

285 queue_item_id=item.id, 

286 moderation_state_id=item.moderation_state_id, 

287 time_created=Timestamp_from_datetime(item.time_created), 

288 trigger=moderationtrigger2api[item.trigger], 

289 reason=item.reason, 

290 is_resolved=item.resolved_by_log_id is not None, 

291 resolved_by_log_id=item.resolved_by_log_id or 0, 

292 moderation_state=moderation_state_to_pb(mod_state, session), 

293 ) 

294 

295 queue_items_pb.append(queue_item_pb) 

296 

297 return moderation_pb2.GetModerationQueueRes( 

298 queue_items=queue_items_pb, 

299 # Use the ID of the last returned item (not the extra fetched item) as the cursor 

300 next_page_token=str(queue_items[page_size - 1].id) if len(queue_items) > page_size else None, 

301 ) 

302 

303 def GetModerationState( 

304 self, request: moderation_pb2.GetModerationStateReq, context: CouchersContext, session: Session 

305 ) -> moderation_pb2.GetModerationStateRes: 

306 """Get moderation state by object type and object ID""" 

307 object_type = moderationobjecttype2sql[request.object_type] 

308 if object_type is None: 

309 context.abort(grpc.StatusCode.INVALID_ARGUMENT, "Object type must be specified.") 

310 

311 moderation_state = session.execute( 

312 select(ModerationState) 

313 .where(ModerationState.object_type == object_type) 

314 .where(ModerationState.object_id == request.object_id) 

315 ).scalar_one_or_none() 

316 if moderation_state is None: 

317 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

318 

319 return moderation_pb2.GetModerationStateRes( 

320 moderation_state=moderation_state_to_pb(moderation_state, session), 

321 ) 

322 

323 def GetModerationLog( 

324 self, request: moderation_pb2.GetModerationLogReq, context: CouchersContext, session: Session 

325 ) -> moderation_pb2.GetModerationLogRes: 

326 """Get moderation log for a specific moderation state""" 

327 # Get the moderation state 

328 moderation_state = session.execute( 

329 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

330 ).scalar_one_or_none() 

331 if moderation_state is None: 

332 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

333 

334 # Get all log entries for this state, ordered by time (most recent first) 

335 log_entries = ( 

336 session.execute( 

337 select(ModerationLog) 

338 .where(ModerationLog.moderation_state_id == request.moderation_state_id) 

339 .order_by(ModerationLog.time.desc(), ModerationLog.id.desc()) 

340 ) 

341 .scalars() 

342 .all() 

343 ) 

344 

345 # Convert moderation state to proto first (while still in session) 

346 moderation_state_pb = moderation_state_to_pb(moderation_state, session) 

347 

348 # Convert to proto 

349 log_entries_pb = [] 

350 for entry in log_entries: 

351 log_entry_pb = moderation_pb2.ModerationLogEntryInfo( 

352 log_entry_id=entry.id, 

353 moderation_state_id=entry.moderation_state_id, 

354 time=Timestamp_from_datetime(entry.time), 

355 action=moderationaction2api[entry.action], 

356 moderator_user_id=entry.moderator_user_id, 

357 reason=entry.reason, 

358 ) 

359 

360 # Only include changed fields 

361 if entry.new_visibility is not None: 361 ↛ 364line 361 didn't jump to line 364 because the condition on line 361 was always true

362 log_entry_pb.new_visibility = moderationvisibility2api[entry.new_visibility] 

363 

364 log_entries_pb.append(log_entry_pb) 

365 

366 return moderation_pb2.GetModerationLogRes( 

367 log_entries=log_entries_pb, 

368 moderation_state=moderation_state_pb, 

369 ) 

370 

371 def ModerateContent( 

372 self, request: moderation_pb2.ModerateContentReq, context: CouchersContext, session: Session 

373 ) -> moderation_pb2.ModerateContentRes: 

374 """Unified moderation action - takes both action and visibility explicitly""" 

375 

376 moderation_state = session.execute( 

377 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

378 ).scalar_one_or_none() 

379 if moderation_state is None: 

380 context.abort(grpc.StatusCode.NOT_FOUND, "Moderation state not found.") 

381 

382 # Convert proto enums to internal enums 

383 action = moderationaction2sql[request.action] 

384 if action is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "action_must_be_specified") 

386 

387 new_visibility = moderationvisibility2sql[request.visibility] 

388 if new_visibility is None: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true

389 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified") 

390 

391 reason = request.reason or "Moderated by admin" 

392 

393 # Track old visibility for metrics 

394 old_visibility = moderation_state.visibility 

395 

396 # Update visibility 

397 moderation_state.visibility = new_visibility 

398 moderation_state.updated = now() 

399 

400 # Log the action 

401 log_entry = ModerationLog( 

402 moderation_state_id=moderation_state.id, 

403 action=action, 

404 moderator_user_id=context.user_id, 

405 new_visibility=new_visibility, 

406 reason=reason, 

407 ) 

408 session.add(log_entry) 

409 session.flush() 

410 

411 # Resolve any pending queue items 

412 queue_item = session.execute( 

413 select(ModerationQueueItem) 

414 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

415 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

416 .order_by(ModerationQueueItem.time_created.desc()) 

417 ).scalar_one_or_none() 

418 

419 if queue_item: 

420 queue_item.resolved_by_log_id = log_entry.id 

421 session.flush() 

422 observe_moderation_queue_item_resolved(queue_item.trigger, action, moderation_state.object_type) 

423 observe_moderation_queue_resolution_time( 

424 queue_item.trigger, 

425 action, 

426 moderation_state.object_type, 

427 (now() - queue_item.time_created).total_seconds(), 

428 ) 

429 

430 observe_moderation_action(action, moderation_state.object_type) 

431 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

432 

433 # If visibility becomes VISIBLE or UNLISTED, trigger pending notifications 

434 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted): 

435 _enqueue_pending_notifications(session, moderation_state.id) 

436 

437 return moderation_pb2.ModerateContentRes( 

438 moderation_state=moderation_state_to_pb(moderation_state, session), 

439 ) 

440 

441 def FlagContentForReview( 

442 self, request: moderation_pb2.FlagContentForReviewReq, context: CouchersContext, session: Session 

443 ) -> moderation_pb2.FlagContentForReviewRes: 

444 """Flag content for review by adding it to the moderation queue""" 

445 

446 moderation_state = session.execute( 

447 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

448 ).scalar_one_or_none() 

449 if not moderation_state: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true

450 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

451 

452 trigger = moderationtrigger2sql[request.trigger] or ModerationTrigger.initial_review 

453 reason = request.reason or "Flagged by admin for review" 

454 

455 # Add to moderation queue 

456 queue_item = ModerationQueueItem( 

457 moderation_state_id=request.moderation_state_id, 

458 trigger=trigger, 

459 reason=reason, 

460 ) 

461 session.add(queue_item) 

462 session.flush() 

463 

464 observe_moderation_action(ModerationAction.flag, moderation_state.object_type) 

465 observe_moderation_queue_item_created(trigger, moderation_state.object_type) 

466 

467 queue_item_pb = moderation_pb2.ModerationQueueItemInfo( 

468 queue_item_id=queue_item.id, 

469 moderation_state_id=queue_item.moderation_state_id, 

470 time_created=Timestamp_from_datetime(queue_item.time_created), 

471 trigger=moderationtrigger2api[queue_item.trigger], 

472 reason=queue_item.reason, 

473 is_resolved=False, 

474 resolved_by_log_id=0, 

475 moderation_state=moderation_state_to_pb(moderation_state, session), 

476 ) 

477 

478 return moderation_pb2.FlagContentForReviewRes(queue_item=queue_item_pb) 

479 

480 def UnflagContent( 

481 self, request: moderation_pb2.UnflagContentReq, context: CouchersContext, session: Session 

482 ) -> moderation_pb2.UnflagContentRes: 

483 """Unflag content by resolving pending queue items""" 

484 

485 moderation_state = session.execute( 

486 select(ModerationState).where(ModerationState.id == request.moderation_state_id) 

487 ).scalar_one_or_none() 

488 if not moderation_state: 

489 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "moderation_state_not_found") 

490 

491 reason = request.reason or "Unflagged by admin" 

492 

493 # Update moderation state (inline moderate_content logic) 

494 moderation_state.updated = now() 

495 

496 # Log the unflag action 

497 log_entry = ModerationLog( 

498 moderation_state_id=moderation_state.id, 

499 action=ModerationAction.unflag, 

500 moderator_user_id=context.user_id, 

501 new_visibility=None, 

502 reason=reason, 

503 ) 

504 session.add(log_entry) 

505 session.flush() 

506 

507 # Resolve any pending queue items (inline resolve_queue_item logic) 

508 queue_item = session.execute( 

509 select(ModerationQueueItem) 

510 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

511 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

512 .order_by(ModerationQueueItem.time_created.desc()) 

513 ).scalar_one_or_none() 

514 

515 if queue_item: 

516 queue_item.resolved_by_log_id = log_entry.id 

517 session.flush() 

518 observe_moderation_queue_item_resolved( 

519 queue_item.trigger, ModerationAction.unflag, moderation_state.object_type 

520 ) 

521 observe_moderation_queue_resolution_time( 

522 queue_item.trigger, 

523 ModerationAction.unflag, 

524 moderation_state.object_type, 

525 (now() - queue_item.time_created).total_seconds(), 

526 ) 

527 

528 observe_moderation_action(ModerationAction.unflag, moderation_state.object_type) 

529 

530 return moderation_pb2.UnflagContentRes( 

531 moderation_state=moderation_state_to_pb(moderation_state, session), 

532 ) 

533 

534 def SetUserContentVisibility( 

535 self, request: moderation_pb2.SetUserContentVisibilityReq, context: CouchersContext, session: Session 

536 ) -> moderation_pb2.SetUserContentVisibilityRes: 

537 """Bulk-set visibility on every UMS-governed object authored by the given user.""" 

538 new_visibility = moderationvisibility2sql[request.visibility] 

539 if new_visibility is None: 

540 context.abort_with_error_code(grpc.StatusCode.INVALID_ARGUMENT, "visibility_must_be_specified") 

541 

542 user = session.execute(select(User).where(User.id == request.user_id)).scalar_one_or_none() 

543 if not user: 

544 context.abort_with_error_code(grpc.StatusCode.NOT_FOUND, "user_not_found") 

545 

546 reason = request.reason or f"Bulk visibility update for user {user.id} to {new_visibility.name}" 

547 

548 author_exists_clauses = [] 

549 for model in moderationobjecttype2model.values(): 

550 author_col = getattr(model, model.__moderation_author_column__) 

551 author_exists_clauses.append( 

552 exists().where(and_(model.moderation_state_id == ModerationState.id, author_col == user.id)) 

553 ) 

554 

555 states = session.execute(select(ModerationState).where(or_(*author_exists_clauses))).scalars().all() 

556 

557 updated_count = 0 

558 for moderation_state in states: 

559 if moderation_state.visibility == new_visibility: 

560 continue 

561 

562 old_visibility = moderation_state.visibility 

563 moderation_state.visibility = new_visibility 

564 moderation_state.updated = now() 

565 

566 log_entry = ModerationLog( 

567 moderation_state_id=moderation_state.id, 

568 action=ModerationAction.bulk_set_visibility, 

569 moderator_user_id=context.user_id, 

570 new_visibility=new_visibility, 

571 reason=reason, 

572 ) 

573 session.add(log_entry) 

574 session.flush() 

575 

576 queue_item = session.execute( 

577 select(ModerationQueueItem) 

578 .where(ModerationQueueItem.moderation_state_id == moderation_state.id) 

579 .where(ModerationQueueItem.resolved_by_log_id.is_(None)) 

580 .order_by(ModerationQueueItem.time_created.desc()) 

581 ).scalar_one_or_none() 

582 if queue_item: 

583 queue_item.resolved_by_log_id = log_entry.id 

584 session.flush() 

585 

586 observe_moderation_action(ModerationAction.bulk_set_visibility, moderation_state.object_type) 

587 observe_moderation_visibility_transition(old_visibility, new_visibility, moderation_state.object_type) 

588 

589 if new_visibility in (ModerationVisibility.visible, ModerationVisibility.unlisted): 

590 _enqueue_pending_notifications(session, moderation_state.id) 

591 

592 updated_count += 1 

593 

594 # Import here to avoid circular dependency 

595 from couchers.servicers.admin import log_admin_action # noqa: PLC0415 

596 

597 log_admin_action( 

598 session, 

599 context, 

600 user, 

601 "set_user_content_visibility", 

602 note=request.reason or None, 

603 tag=new_visibility.name, 

604 level=AdminActionLevel.high, 

605 ) 

606 

607 return moderation_pb2.SetUserContentVisibilityRes(updated_count=updated_count)