Coverage for app/backend/src/couchers/sql.py: 96%
73 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 09:29 +0000
1from typing import TYPE_CHECKING, Any
3from sqlalchemy import ColumnElement, and_, false, or_, select, true
4from sqlalchemy.orm import InstrumentedAttribute, aliased
5from sqlalchemy.sql import Select, exists, union
7from couchers.models import (
8 ModerationState,
9 ModerationVisibility,
10 SignupFlow,
11 User,
12 UserBlock,
13 get_moderated_models,
14)
15from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username
17if TYPE_CHECKING:
18 from couchers.context import CouchersContext
19 from couchers.materialized_views import LiteUser
20 from couchers.models.moderation import ModeratedContent
22 type _UserLike = type[User | LiteUser | SignupFlow]
23 type _User = type[User | LiteUser]
26def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]:
27 if is_valid_username(value):
28 return table.username == value
29 elif is_valid_email(value) and hasattr(table, "email"):
30 return table.email == value
31 # no fields match, this will return no rows
32 return false()
35def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]:
36 if is_valid_username(value):
37 return table.username == value
38 elif is_valid_user_id(value):
39 return table.id == int(value)
40 # no fields match, this will return no rows
41 return false()
44def username_or_email_or_id(value: str) -> ColumnElement[bool]:
45 # Should only be used for admin APIs, etc.
46 if is_valid_username(value):
47 return User.username == value
48 elif is_valid_email(value):
49 return User.email == value
50 elif is_valid_user_id(value): 50 ↛ 53line 50 didn't jump to line 53 because the condition on line 50 was always true
51 return User.id == int(value)
52 # no fields match, this will return no rows
53 return false()
56def _shadow_clause(context: CouchersContext, table: _User) -> ColumnElement[bool]:
57 if context.is_logged_in():
58 return or_(table.shadowed_at.is_(None), table.id == context.user_id)
59 return table.shadowed_at.is_(None)
62def _users_block_each_other(
63 user_a: InstrumentedAttribute[int], user_b: InstrumentedAttribute[int]
64) -> ColumnElement[bool]:
65 """True when either of the two users (referenced by id columns) has blocked the other."""
66 return exists(
67 select(1)
68 .select_from(UserBlock)
69 .where(
70 or_(
71 and_(UserBlock.blocking_user_id == user_a, UserBlock.blocked_user_id == user_b),
72 and_(UserBlock.blocking_user_id == user_b, UserBlock.blocked_user_id == user_a),
73 )
74 )
75 )
78def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]:
79 """
80 Filters out users that should not be visible: blocked, deleted, banned, or shadowed (to others).
82 Filters the given table, assuming it's already joined/selected from
83 """
84 hidden_users = _relevant_user_blocks(context.user_id)
85 return and_(table.is_visible, _shadow_clause(context, table), ~table.id.in_(hidden_users))
88def where_users_column_visible[T: tuple[Any, ...]](
89 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int]
90) -> Select[T]:
91 """
92 Filters the given column, not yet joined/selected from
93 """
94 # EXISTS on the bare User table rather than aliased(User): aliasing the wide User entity per call
95 # is a major CPU hotspot (rebuilds the ORM proxy index). correlate_except keeps this User local to
96 # the subquery so it doesn't bind to a bare User already in the outer query (e.g. jobs/handlers.py).
97 return query.where(
98 exists(
99 select(1)
100 .select_from(User)
101 .where(User.id == column)
102 .where(users_visible(context, User))
103 .correlate_except(User)
104 )
105 )
108def users_visible_to_each_other(*, self_user: _User, other_user: _User) -> ColumnElement[bool]:
109 """
110 Filters to ensure other_user is visible to self_user, and that they haven't blocked each other.
112 Use this when both User tables are already joined/selected in the query.
113 """
114 return and_(
115 self_user.is_visible,
116 other_user.is_visible,
117 other_user.shadowed_at.is_(None),
118 ~_users_block_each_other(self_user.id, other_user.id),
119 )
122def where_user_columns_visible_to_each_other[T: tuple[Any, ...]](
123 query: Select[T], *, self_column: InstrumentedAttribute[int], other_column: InstrumentedAttribute[int]
124) -> Select[T]:
125 """
126 Filters to ensure the user in other_column is visible to the user in self_column, and that they
127 haven't blocked each other.
129 Use this when you have two user_id columns that haven't been joined yet. This will join both
130 User tables and apply the visibility checks.
131 """
132 self_user = aliased(User)
133 other_user = aliased(User)
134 return (
135 query.join(self_user, self_user.id == self_column)
136 .join(other_user, other_user.id == other_column)
137 .where(self_user.is_visible)
138 .where(other_user.is_visible)
139 .where(other_user.shadowed_at.is_(None))
140 .where(~_users_block_each_other(self_user.id, other_user.id))
141 )
144def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]](
145 query: Select[T],
146 table: type[ModeratedContent],
147 user_id_column: InstrumentedAttribute[int],
148 is_list_operation: bool = False,
149) -> Select[T]:
150 entry = get_moderated_models()[table.__moderation_object_type__]
151 aliased_mod_state = aliased(ModerationState)
152 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
154 # UNLISTED content is visible in single-item operations but not in lists
155 if not is_list_operation: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true
156 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
158 # Authors can always see their own SHADOWED content
159 conditions.append(
160 and_(
161 aliased_mod_state.visibility == ModerationVisibility.shadowed,
162 entry.author_column == user_id_column,
163 )
164 )
166 return query.join(aliased_mod_state, aliased_mod_state.id == entry.moderation_state_id_column).where(
167 or_(*conditions)
168 )
171def where_moderated_content_visible[T: tuple[Any, ...]](
172 query: Select[T],
173 context: CouchersContext,
174 table: type[ModeratedContent],
175 is_list_operation: bool = False,
176) -> Select[T]:
177 entry = get_moderated_models()[table.__moderation_object_type__]
178 aliased_mod_state = aliased(ModerationState)
179 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
181 # UNLISTED content is visible in single-item operations but not in lists
182 if not is_list_operation:
183 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
185 # Authors can always see their own SHADOWED content
186 if context.is_logged_in():
187 conditions.append(
188 and_(
189 aliased_mod_state.visibility == ModerationVisibility.shadowed,
190 entry.author_column == context.user_id,
191 )
192 )
194 return query.join(aliased_mod_state, aliased_mod_state.id == entry.moderation_state_id_column).where(
195 or_(*conditions)
196 )
199def moderation_state_column_visible(
200 context: CouchersContext,
201 column: InstrumentedAttribute[int | None],
202) -> ColumnElement[bool]:
203 """
204 Filters based on whether the moderation state referenced by the column is visible.
206 Use this when you have a moderation_state_id column on a table that's not the moderated
207 content itself (e.g., Notification.moderation_state_id).
209 The condition evaluates to True when:
210 - The column is NULL (non-moderated content), OR
211 - The linked moderation state has visibility 'visible' or 'unlisted', OR
212 - The linked moderation state has visibility 'shadowed' and the current user is the author
213 """
214 aliased_mod_state = aliased(ModerationState)
216 # For 'shadowed' content, look up the moderated content via object_type/object_id to check the author
217 shadowed_conditions: list[ColumnElement[bool]] = []
218 if context.is_logged_in(): 218 ↛ 232line 218 didn't jump to line 232 because the condition on line 218 was always true
219 for entry in get_moderated_models().values():
220 shadowed_conditions.append(
221 and_(
222 aliased_mod_state.object_type == entry.object_type,
223 exists(
224 select(1)
225 .select_from(entry.model)
226 .where(entry.object_id_column == aliased_mod_state.object_id)
227 .where(entry.author_column == context.user_id)
228 ),
229 )
230 )
232 return or_(
233 column.is_(None),
234 exists(
235 select(aliased_mod_state.id).where(
236 aliased_mod_state.id == column,
237 or_(
238 aliased_mod_state.visibility == ModerationVisibility.visible,
239 aliased_mod_state.visibility == ModerationVisibility.unlisted,
240 *shadowed_conditions,
241 ),
242 )
243 ),
244 )
247def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]:
248 """
249 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden
250 """
251 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id)
252 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id)
254 return select(union(blocked_users, blocking_users).subquery())
257def to_bool(value: bool) -> ColumnElement[bool]:
258 return true() if value else false()