Coverage for app / backend / src / couchers / moderation / utils.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-19 14:14 +0000

1""" 

2Utility functions for the Unified Moderation System (UMS) 

3""" 

4 

5from collections.abc import Callable 

6 

7from sqlalchemy.orm import Session 

8 

9from couchers.metrics import observe_moderation_action, observe_moderation_queue_item_created 

10from couchers.models import ( 

11 ModerationAction, 

12 ModerationLog, 

13 ModerationObjectType, 

14 ModerationQueueItem, 

15 ModerationState, 

16 ModerationTrigger, 

17 ModerationVisibility, 

18) 

19 

20 

21def create_moderation( 

22 session: Session, 

23 object_type: ModerationObjectType, 

24 object_id: int | Callable[[int], int], 

25 creator_user_id: int, 

26) -> ModerationState: 

27 # Handle callback pattern for circular dependencies 

28 if callable(object_id): 

29 moderation_state = ModerationState( 

30 object_type=object_type, 

31 object_id=0, # Placeholder 

32 visibility=ModerationVisibility.shadowed, 

33 ) 

34 session.add(moderation_state) 

35 session.flush() 

36 

37 # Call the callback to create the object and get its ID 

38 actual_object_id = object_id(moderation_state.id) 

39 moderation_state.object_id = actual_object_id 

40 else: 

41 moderation_state = ModerationState( 

42 object_type=object_type, 

43 object_id=object_id, 

44 visibility=ModerationVisibility.shadowed, 

45 ) 

46 session.add(moderation_state) 

47 session.flush() 

48 

49 session.add( 

50 ModerationLog( 

51 moderation_state_id=moderation_state.id, 

52 action=ModerationAction.create, 

53 moderator_user_id=creator_user_id, 

54 new_visibility=ModerationVisibility.shadowed, 

55 reason="Object created.", 

56 ) 

57 ) 

58 

59 session.add( 

60 ModerationQueueItem( 

61 moderation_state_id=moderation_state.id, 

62 trigger=ModerationTrigger.initial_review, 

63 reason="Object created.", 

64 ) 

65 ) 

66 session.flush() 

67 

68 observe_moderation_action(ModerationAction.create, object_type) 

69 observe_moderation_queue_item_created(ModerationTrigger.initial_review, object_type) 

70 

71 return moderation_state