Coverage for src/couchers/rate_limits/check.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-20 11:53 +0000

1import logging 

2from collections.abc import Sequence 

3 

4from sqlalchemy import RowMapping, exists, select 

5from sqlalchemy.orm import Session 

6 

7from couchers.models import RateLimitAction, RateLimitViolation 

8from couchers.rate_limits.definitions import RATE_LIMIT_DEFINITIONS, RATE_LIMIT_INTERVAL 

9from couchers.tasks import send_rate_limit_violation_report_email 

10from couchers.utils import now 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def _get_user_events_in_past_time_interval( 

16 session: Session, user_id: int 

17) -> dict[RateLimitAction, Sequence[RowMapping]]: 

18 """Get all relevant events for the user in the last rate limit interval for the mod email.""" 

19 return { 

20 action: RATE_LIMIT_DEFINITIONS[action].mod_email_information_query(session, user_id) 

21 for action in RateLimitAction 

22 } 

23 

24 

25def _save_rate_limit_violation( 

26 session: Session, user_id: int, action: RateLimitAction, is_hard_limit: bool 

27) -> RateLimitViolation: 

28 """Save a rate limit violation to the database and return it.""" 

29 violation = RateLimitViolation( 

30 user_id=user_id, 

31 action=action, 

32 is_hard_limit=is_hard_limit, 

33 ) 

34 session.add(violation) 

35 session.flush() 

36 return violation 

37 

38 

39def _user_has_violated_rate_limit_in_past_time_interval( 

40 session: Session, user_id: int, action: RateLimitAction, is_hard_limit: bool 

41) -> bool: 

42 """Check if a RateLimitViolation for the user for the given action exists in the last RATE_LIMIT_INTERVAL.""" 

43 return session.execute( 

44 select( 

45 exists().where( 

46 RateLimitViolation.user_id == user_id, 

47 RateLimitViolation.action == action, 

48 RateLimitViolation.created >= now() - RATE_LIMIT_INTERVAL, 

49 RateLimitViolation.is_hard_limit == is_hard_limit, 

50 ) 

51 ) 

52 ).scalar_one() 

53 

54 

55def process_rate_limits_and_check_abort(session: Session, user_id: int, action: RateLimitAction) -> bool: 

56 """ 

57 Check if the user has reached a rate limit. Notify the moderation team in a separate background job if so. 

58 

59 Returns True if the user has reached a hard rate limit. 

60 """ 

61 rate_limit_definition = RATE_LIMIT_DEFINITIONS[action] 

62 count_last_interval = rate_limit_definition.count_actions_query(session, user_id) 

63 for limit, is_hard_limit in [ 

64 (rate_limit_definition.hard_limit, True), 

65 (rate_limit_definition.warning_limit, False), 

66 ]: 

67 if count_last_interval >= limit: 

68 if not _user_has_violated_rate_limit_in_past_time_interval( 

69 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit 

70 ): 

71 rate_limit_violation = _save_rate_limit_violation( 

72 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit 

73 ) 

74 events = _get_user_events_in_past_time_interval(session=session, user_id=user_id) 

75 send_rate_limit_violation_report_email( 

76 session=session, rate_limit_violation=rate_limit_violation, threshold=limit, events=events 

77 ) 

78 if is_hard_limit: 

79 return True 

80 return False