Coverage for src/couchers/rate_limits/check.py: 100%
30 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-20 11:53 +0000
1import logging
2from collections.abc import Sequence
4from sqlalchemy import RowMapping, exists, select
5from sqlalchemy.orm import Session
7from couchers.models import RateLimitAction, RateLimitViolation
8from couchers.rate_limits.definitions import RATE_LIMIT_DEFINITIONS, RATE_LIMIT_INTERVAL
9from couchers.tasks import send_rate_limit_violation_report_email
10from couchers.utils import now
12logger = logging.getLogger(__name__)
15def _get_user_events_in_past_time_interval(
16 session: Session, user_id: int
17) -> dict[RateLimitAction, Sequence[RowMapping]]:
18 """Get all relevant events for the user in the last rate limit interval for the mod email."""
19 return {
20 action: RATE_LIMIT_DEFINITIONS[action].mod_email_information_query(session, user_id)
21 for action in RateLimitAction
22 }
25def _save_rate_limit_violation(
26 session: Session, user_id: int, action: RateLimitAction, is_hard_limit: bool
27) -> RateLimitViolation:
28 """Save a rate limit violation to the database and return it."""
29 violation = RateLimitViolation(
30 user_id=user_id,
31 action=action,
32 is_hard_limit=is_hard_limit,
33 )
34 session.add(violation)
35 session.flush()
36 return violation
39def _user_has_violated_rate_limit_in_past_time_interval(
40 session: Session, user_id: int, action: RateLimitAction, is_hard_limit: bool
41) -> bool:
42 """Check if a RateLimitViolation for the user for the given action exists in the last RATE_LIMIT_INTERVAL."""
43 return session.execute(
44 select(
45 exists().where(
46 RateLimitViolation.user_id == user_id,
47 RateLimitViolation.action == action,
48 RateLimitViolation.created >= now() - RATE_LIMIT_INTERVAL,
49 RateLimitViolation.is_hard_limit == is_hard_limit,
50 )
51 )
52 ).scalar_one()
55def process_rate_limits_and_check_abort(session: Session, user_id: int, action: RateLimitAction) -> bool:
56 """
57 Check if the user has reached a rate limit. Notify the moderation team in a separate background job if so.
59 Returns True if the user has reached a hard rate limit.
60 """
61 rate_limit_definition = RATE_LIMIT_DEFINITIONS[action]
62 count_last_interval = rate_limit_definition.count_actions_query(session, user_id)
63 for limit, is_hard_limit in [
64 (rate_limit_definition.hard_limit, True),
65 (rate_limit_definition.warning_limit, False),
66 ]:
67 if count_last_interval >= limit:
68 if not _user_has_violated_rate_limit_in_past_time_interval(
69 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit
70 ):
71 rate_limit_violation = _save_rate_limit_violation(
72 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit
73 )
74 events = _get_user_events_in_past_time_interval(session=session, user_id=user_id)
75 send_rate_limit_violation_report_email(
76 session=session, rate_limit_violation=rate_limit_violation, threshold=limit, events=events
77 )
78 if is_hard_limit:
79 return True
80 return False