Coverage for app / backend / src / couchers / sql.py: 94%
67 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 11:04 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-15 11:04 +0000
1from typing import TYPE_CHECKING, Any
3from sqlalchemy import ColumnElement, and_, false, or_, select, true
4from sqlalchemy.orm import InstrumentedAttribute, aliased
5from sqlalchemy.sql import Select, exists, union
7from couchers.context import CouchersContext
8from couchers.models import (
9 EventOccurrence,
10 FriendRelationship,
11 GroupChat,
12 HostRequest,
13 ModerationObjectType,
14 ModerationState,
15 ModerationVisibility,
16 SignupFlow,
17 User,
18 UserBlock,
19)
20from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username
22if TYPE_CHECKING:
23 from couchers.materialized_views import LiteUser
25 type _UserLike = type[User | LiteUser | SignupFlow]
26 type _User = type[User | LiteUser]
27 type _ModeratedContent = type[HostRequest | GroupChat | FriendRelationship | EventOccurrence]
30def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]:
31 if is_valid_username(value):
32 return table.username == value
33 elif is_valid_email(value) and hasattr(table, "email"):
34 return table.email == value
35 # no fields match, this will return no rows
36 return false()
39def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]:
40 if is_valid_username(value):
41 return table.username == value
42 elif is_valid_user_id(value):
43 return table.id == value
44 # no fields match, this will return no rows
45 return false()
48def username_or_email_or_id(value: str) -> ColumnElement[bool]:
49 # Should only be used for admin APIs, etc.
50 if is_valid_username(value):
51 return User.username == value
52 elif is_valid_email(value):
53 return User.email == value
54 elif is_valid_user_id(value): 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true
55 return User.id == value
56 # no fields match, this will return no rows
57 return false()
60def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]:
61 """
62 Filters out users that should not be visible: blocked, deleted, or banned
64 Filters the given table, assuming it's already joined/selected from
65 """
66 hidden_users = _relevant_user_blocks(context.user_id)
67 return and_(table.is_visible, ~table.id.in_(hidden_users))
70def where_users_column_visible[T: tuple[Any, ...]](
71 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int]
72) -> Select[T]:
73 """
74 Filters the given column, not yet joined/selected from
75 """
76 hidden_users = _relevant_user_blocks(context.user_id)
77 aliased_user = aliased(User)
78 return (
79 query.join(aliased_user, aliased_user.id == column)
80 .where(aliased_user.is_visible)
81 .where(~aliased_user.id.in_(hidden_users))
82 )
85def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]:
86 """
87 Filters to ensure two users are mutually visible to each other.
89 Checks that:
90 - Both users are visible (not deleted/banned)
91 - Neither user has blocked the other (bidirectional check)
93 Use this when both User tables are already joined/selected in the query.
94 """
95 return and_(
96 user1.is_visible,
97 user2.is_visible,
98 ~exists(
99 select(1)
100 .select_from(UserBlock)
101 .where(
102 or_(
103 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
104 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
105 )
106 )
107 ),
108 )
111def where_user_columns_visible_to_each_other[T: tuple[Any, ...]](
112 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int]
113) -> Select[T]:
114 """
115 Filters to ensure two users are mutually visible to each other.
117 Checks that:
118 - Both users are visible (not deleted/banned)
119 - Neither user has blocked the other (bidirectional check)
121 Use this when you have two user_id columns that haven't been joined yet.
122 This will join both User tables and apply the visibility checks.
123 """
124 user1 = aliased(User)
125 user2 = aliased(User)
126 return (
127 query.join(user1, user1.id == column1)
128 .join(user2, user2.id == column2)
129 .where(user1.is_visible)
130 .where(user2.is_visible)
131 .where(
132 ~exists(
133 select(1)
134 .select_from(UserBlock)
135 .where(
136 or_(
137 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
138 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
139 )
140 )
141 )
142 )
143 )
146def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]](
147 query: Select[T],
148 table: _ModeratedContent,
149 user_id_column: InstrumentedAttribute[int],
150 is_list_operation: bool = False,
151) -> Select[T]:
152 aliased_mod_state = aliased(ModerationState)
153 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
155 # UNLISTED content is visible in single-item operations but not in lists
156 if not is_list_operation: 156 ↛ 160line 156 didn't jump to line 160 because the condition on line 156 was always true
157 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
159 # Authors can always see their own SHADOWED content
160 conditions.append(
161 and_(
162 aliased_mod_state.visibility == ModerationVisibility.shadowed,
163 getattr(table, table.__moderation_author_column__) == user_id_column,
164 )
165 )
167 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
170def where_moderated_content_visible[T: tuple[Any, ...]](
171 query: Select[T],
172 context: CouchersContext,
173 table: _ModeratedContent,
174 is_list_operation: bool = False,
175) -> Select[T]:
176 aliased_mod_state = aliased(ModerationState)
177 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
179 # UNLISTED content is visible in single-item operations but not in lists
180 if not is_list_operation:
181 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
183 # Authors can always see their own SHADOWED content
184 if context.is_logged_in(): 184 ↛ 192line 184 didn't jump to line 192 because the condition on line 184 was always true
185 conditions.append(
186 and_(
187 aliased_mod_state.visibility == ModerationVisibility.shadowed,
188 getattr(table, table.__moderation_author_column__) == context.user_id,
189 )
190 )
192 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
195def moderation_state_column_visible(
196 context: CouchersContext,
197 column: InstrumentedAttribute[int | None],
198) -> ColumnElement[bool]:
199 """
200 Filters based on whether the moderation state referenced by the column is visible.
202 Use this when you have a moderation_state_id column on a table that's not the moderated
203 content itself (e.g., Notification.moderation_state_id).
205 The condition evaluates to True when:
206 - The column is NULL (non-moderated content), OR
207 - The linked moderation state has visibility 'visible' or 'unlisted', OR
208 - The linked moderation state has visibility 'shadowed' and the current user is the author
209 """
210 aliased_mod_state = aliased(ModerationState)
212 # For 'shadowed' content, check if the user is the author by looking up the content table
213 # using object_type and object_id on the moderation_state row
214 shadowed_conditions: list[ColumnElement[bool]] = []
215 if context.is_logged_in(): 215 ↛ 260line 215 didn't jump to line 260 because the condition on line 215 was always true
216 shadowed_conditions = [
217 and_(
218 aliased_mod_state.visibility == ModerationVisibility.shadowed,
219 or_(
220 and_(
221 aliased_mod_state.object_type == ModerationObjectType.host_request,
222 exists(
223 select(HostRequest.conversation_id).where(
224 HostRequest.conversation_id == aliased_mod_state.object_id,
225 HostRequest.initiator_user_id == context.user_id,
226 )
227 ),
228 ),
229 and_(
230 aliased_mod_state.object_type == ModerationObjectType.group_chat,
231 exists(
232 select(GroupChat.conversation_id).where(
233 GroupChat.conversation_id == aliased_mod_state.object_id,
234 GroupChat.creator_id == context.user_id,
235 )
236 ),
237 ),
238 and_(
239 aliased_mod_state.object_type == ModerationObjectType.friend_request,
240 exists(
241 select(FriendRelationship.id).where(
242 FriendRelationship.id == aliased_mod_state.object_id,
243 FriendRelationship.from_user_id == context.user_id,
244 )
245 ),
246 ),
247 and_(
248 aliased_mod_state.object_type == ModerationObjectType.event_occurrence,
249 exists(
250 select(EventOccurrence.id).where(
251 EventOccurrence.id == aliased_mod_state.object_id,
252 EventOccurrence.creator_user_id == context.user_id,
253 )
254 ),
255 ),
256 ),
257 )
258 ]
260 return or_(
261 column.is_(None),
262 exists(
263 select(aliased_mod_state.id).where(
264 aliased_mod_state.id == column,
265 or_(
266 aliased_mod_state.visibility == ModerationVisibility.visible,
267 aliased_mod_state.visibility == ModerationVisibility.unlisted,
268 *shadowed_conditions,
269 ),
270 )
271 ),
272 )
275def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]:
276 """
277 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden
278 """
279 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id)
280 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id)
282 return select(union(blocked_users, blocking_users).subquery())
285def to_bool(value: bool) -> ColumnElement[bool]:
286 return true() if value else false()