Coverage for src/couchers/rate_limits/check.py: 97%
31 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-07-12 05:54 +0000
1import logging
2from typing import TYPE_CHECKING
4from sqlalchemy import exists, select
6from couchers.models import RateLimitAction, RateLimitViolation
7from couchers.rate_limits.definitions import RATE_LIMIT_DEFINITIONS, RATE_LIMIT_INTERVAL
8from couchers.tasks import send_rate_limit_violation_report_email
9from couchers.utils import now
11if TYPE_CHECKING:
12 from sqlalchemy.orm import Session
15logger = logging.getLogger(__name__)
18def _get_user_events_in_past_time_interval(session, user_id: int) -> dict[RateLimitAction, list[dict]]:
19 """Get all relevant events for the user in the last rate limit interval for the mod email."""
20 return {
21 action: RATE_LIMIT_DEFINITIONS[action].mod_email_information_query(session, user_id)
22 for action in RateLimitAction
23 }
26def _save_rate_limit_violation(
27 session: "Session", user_id: int, action: RateLimitAction, is_hard_limit: bool
28) -> RateLimitViolation:
29 """Save a rate limit violation to the database and return it."""
30 violation = RateLimitViolation(
31 user_id=user_id,
32 action=action,
33 is_hard_limit=is_hard_limit,
34 )
35 session.add(violation)
36 session.flush()
37 return violation
40def _user_has_violated_rate_limit_in_past_time_interval(
41 session: "Session", user_id: int, action: RateLimitAction, is_hard_limit: bool
42) -> bool:
43 """Check if a RateLimitViolation for the user for the given action exists in the last RATE_LIMIT_INTERVAL."""
44 return session.execute(
45 select(
46 exists().where(
47 RateLimitViolation.user_id == user_id,
48 RateLimitViolation.action == action,
49 RateLimitViolation.created >= now() - RATE_LIMIT_INTERVAL,
50 RateLimitViolation.is_hard_limit == is_hard_limit,
51 )
52 )
53 ).scalar_one()
56def process_rate_limits_and_check_abort(session: "Session", user_id: int, action: RateLimitAction) -> bool:
57 """
58 Check if the user has reached a rate limit. Notify the moderation team in a separate background job if so.
60 Returns True if the user has reached a hard rate limit.
61 """
62 rate_limit_definition = RATE_LIMIT_DEFINITIONS[action]
63 count_last_interval = rate_limit_definition.count_actions_query(session=session, user_id=user_id)
64 for limit, is_hard_limit in [
65 (rate_limit_definition.hard_limit, True),
66 (rate_limit_definition.warning_limit, False),
67 ]:
68 if count_last_interval >= limit:
69 if not _user_has_violated_rate_limit_in_past_time_interval(
70 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit
71 ):
72 rate_limit_violation = _save_rate_limit_violation(
73 session=session, user_id=user_id, action=action, is_hard_limit=is_hard_limit
74 )
75 events = _get_user_events_in_past_time_interval(session=session, user_id=user_id)
76 send_rate_limit_violation_report_email(
77 session=session, rate_limit_violation=rate_limit_violation, threshold=limit, events=events
78 )
79 if is_hard_limit:
80 return True
81 return False