Coverage for src/couchers/rate_limits/check.py: 97%

31 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-07-12 05:54 +0000

1import logging 

2from typing import TYPE_CHECKING 

3 

4from sqlalchemy import exists, select 

5 

6from couchers.models import RateLimitAction, RateLimitViolation 

7from couchers.rate_limits.definitions import RATE_LIMIT_DEFINITIONS, RATE_LIMIT_INTERVAL 

8from couchers.tasks import send_rate_limit_violation_report_email 

9from couchers.utils import now 

10 

11if TYPE_CHECKING: 

12 from sqlalchemy.orm import Session 

13 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18def _get_user_events_in_past_time_interval(session, user_id: int) -> dict[RateLimitAction, list[dict]]: 

19 """Get all relevant events for the user in the last rate limit interval for the mod email.""" 

20 return { 

21 action: RATE_LIMIT_DEFINITIONS[action].mod_email_information_query(session, user_id) 

22 for action in RateLimitAction 

23 } 

24 

25 

26def _save_rate_limit_violation( 

27 session: "Session", user_id: int, action: RateLimitAction, is_hard_limit: bool 

28) -> RateLimitViolation: 

29 """Save a rate limit violation to the database and return it.""" 

30 violation = RateLimitViolation( 

31 user_id=user_id, 

32 action=action, 

33 is_hard_limit=is_hard_limit, 

34 ) 

35 session.add(violation) 

36 session.flush() 

37 return violation 

38 

39 

40def _user_has_violated_rate_limit_in_past_time_interval( 

41 session: "Session", user_id: int, action: RateLimitAction, is_hard_limit: bool 

42) -> bool: 

43 """Check if a RateLimitViolation for the user for the given action exists in the last RATE_LIMIT_INTERVAL.""" 

44 return session.execute( 

45 select( 

46 exists().where( 

47 RateLimitViolation.user_id == user_id, 

48 RateLimitViolation.action == action, 

49 RateLimitViolation.created >= now() - RATE_LIMIT_INTERVAL, 

50 RateLimitViolation.is_hard_limit == is_hard_limit, 

51 ) 

52 ) 

53 ).scalar_one() 

54 

55 

56def process_rate_limits_and_check_abort(session: "Session", user_id: int, action: RateLimitAction) -> bool: 

57 """ 

58 Check if the user has reached a rate limit. Notify the moderation team in a separate background job if so. 

59 

60 Returns True if the user has reached a hard rate limit. 

61 """ 

62 rate_limit_definition = RATE_LIMIT_DEFINITIONS[action] 

63 count_last_interval = rate_limit_definition.count_actions_query(session=session, user_id=user_id) 

64 for limit, is_hard_limit in [ 

65 (rate_limit_definition.hard_limit, True), 

66 (rate_limit_definition.warning_limit, False), 

67 ]: 

68 if count_last_interval >= limit: 

69 if not _user_has_violated_rate_limit_in_past_time_interval( 

70 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit 

71 ): 

72 rate_limit_violation = _save_rate_limit_violation( 

73 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit 

74 ) 

75 events = _get_user_events_in_past_time_interval(session=session, user_id=user_id) 

76 send_rate_limit_violation_report_email( 

77 session=session, rate_limit_violation=rate_limit_violation, threshold=limit, events=events 

78 ) 

79 if is_hard_limit: 

80 return True 

81 return False