Coverage for app / backend / src / couchers / sql.py: 95%
67 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-19 14:14 +0000
1from typing import TYPE_CHECKING, Any
3from sqlalchemy import ColumnElement, and_, false, or_, select, true
4from sqlalchemy.orm import InstrumentedAttribute, aliased
5from sqlalchemy.sql import Select, exists, union
7from couchers.context import CouchersContext
8from couchers.models import (
9 EventOccurrence,
10 FriendRelationship,
11 GroupChat,
12 HostRequest,
13 ModerationState,
14 ModerationVisibility,
15 SignupFlow,
16 User,
17 UserBlock,
18)
19from couchers.utils import is_valid_email, is_valid_user_id, is_valid_username
21if TYPE_CHECKING:
22 from couchers.materialized_views import LiteUser
24 type _UserLike = type[User | LiteUser | SignupFlow]
25 type _User = type[User | LiteUser]
26 type _ModeratedContent = type[HostRequest | GroupChat | FriendRelationship | EventOccurrence]
29def username_or_email(value: str, table: _UserLike = User) -> ColumnElement[bool]:
30 if is_valid_username(value):
31 return table.username == value
32 elif is_valid_email(value) and hasattr(table, "email"):
33 return table.email == value
34 # no fields match, this will return no rows
35 return false()
38def username_or_id(value: str, table: _UserLike = User) -> ColumnElement[bool]:
39 if is_valid_username(value):
40 return table.username == value
41 elif is_valid_user_id(value):
42 return table.id == value
43 # no fields match, this will return no rows
44 return false()
47def username_or_email_or_id(value: str) -> ColumnElement[bool]:
48 # Should only be used for admin APIs, etc.
49 if is_valid_username(value):
50 return User.username == value
51 elif is_valid_email(value):
52 return User.email == value
53 elif is_valid_user_id(value): 53 ↛ 56line 53 didn't jump to line 56 because the condition on line 53 was always true
54 return User.id == value
55 # no fields match, this will return no rows
56 return false()
59def users_visible(context: CouchersContext, table: _User = User) -> ColumnElement[bool]:
60 """
61 Filters out users that should not be visible: blocked, deleted, or banned
63 Filters the given table, assuming it's already joined/selected from
64 """
65 hidden_users = _relevant_user_blocks(context.user_id)
66 return and_(table.is_visible, ~table.id.in_(hidden_users))
69def where_users_column_visible[T: tuple[Any, ...]](
70 query: Select[T], context: CouchersContext, column: InstrumentedAttribute[int]
71) -> Select[T]:
72 """
73 Filters the given column, not yet joined/selected from
74 """
75 hidden_users = _relevant_user_blocks(context.user_id)
76 aliased_user = aliased(User)
77 return (
78 query.join(aliased_user, aliased_user.id == column)
79 .where(aliased_user.is_visible)
80 .where(~aliased_user.id.in_(hidden_users))
81 )
84def users_visible_to_each_other(user1: _User, user2: _User) -> ColumnElement[bool]:
85 """
86 Filters to ensure two users are mutually visible to each other.
88 Checks that:
89 - Both users are visible (not deleted/banned)
90 - Neither user has blocked the other (bidirectional check)
92 Use this when both User tables are already joined/selected in the query.
93 """
94 return and_(
95 user1.is_visible,
96 user2.is_visible,
97 ~exists(
98 select(1)
99 .select_from(UserBlock)
100 .where(
101 or_(
102 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
103 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
104 )
105 )
106 ),
107 )
110def where_user_columns_visible_to_each_other[T: tuple[Any, ...]](
111 query: Select[T], column1: InstrumentedAttribute[int], column2: InstrumentedAttribute[int]
112) -> Select[T]:
113 """
114 Filters to ensure two users are mutually visible to each other.
116 Checks that:
117 - Both users are visible (not deleted/banned)
118 - Neither user has blocked the other (bidirectional check)
120 Use this when you have two user_id columns that haven't been joined yet.
121 This will join both User tables and apply the visibility checks.
122 """
123 user1 = aliased(User)
124 user2 = aliased(User)
125 return (
126 query.join(user1, user1.id == column1)
127 .join(user2, user2.id == column2)
128 .where(user1.is_visible)
129 .where(user2.is_visible)
130 .where(
131 ~exists(
132 select(1)
133 .select_from(UserBlock)
134 .where(
135 or_(
136 and_(UserBlock.blocking_user_id == user1.id, UserBlock.blocked_user_id == user2.id),
137 and_(UserBlock.blocking_user_id == user2.id, UserBlock.blocked_user_id == user1.id),
138 )
139 )
140 )
141 )
142 )
145def where_moderated_content_visible_to_user_column[T: tuple[Any, ...]](
146 query: Select[T],
147 table: _ModeratedContent,
148 user_id_column: InstrumentedAttribute[int],
149 is_list_operation: bool = False,
150) -> Select[T]:
151 aliased_mod_state = aliased(ModerationState)
152 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
154 # UNLISTED content is visible in single-item operations but not in lists
155 if not is_list_operation: 155 ↛ 159line 155 didn't jump to line 159 because the condition on line 155 was always true
156 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
158 # Authors can always see their own SHADOWED content
159 conditions.append(
160 and_(
161 aliased_mod_state.visibility == ModerationVisibility.shadowed,
162 getattr(table, table.__moderation_author_column__) == user_id_column,
163 )
164 )
166 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
169def where_moderated_content_visible[T: tuple[Any, ...]](
170 query: Select[T],
171 context: CouchersContext,
172 table: _ModeratedContent,
173 is_list_operation: bool = False,
174) -> Select[T]:
175 aliased_mod_state = aliased(ModerationState)
176 conditions = [aliased_mod_state.visibility == ModerationVisibility.visible]
178 # UNLISTED content is visible in single-item operations but not in lists
179 if not is_list_operation:
180 conditions.append(aliased_mod_state.visibility == ModerationVisibility.unlisted)
182 # Authors can always see their own SHADOWED content
183 if context.is_logged_in(): 183 ↛ 191line 183 didn't jump to line 191 because the condition on line 183 was always true
184 conditions.append(
185 and_(
186 aliased_mod_state.visibility == ModerationVisibility.shadowed,
187 getattr(table, table.__moderation_author_column__) == context.user_id,
188 )
189 )
191 return query.join(aliased_mod_state, aliased_mod_state.id == table.moderation_state_id).where(or_(*conditions))
194def moderation_state_column_visible(
195 context: CouchersContext,
196 column: InstrumentedAttribute[int | None],
197) -> ColumnElement[bool]:
198 """
199 Filters based on whether the moderation state referenced by the column is visible.
201 Use this when you have a moderation_state_id column on a table that's not the moderated
202 content itself (e.g., Notification.moderation_state_id).
204 The condition evaluates to True when:
205 - The column is NULL (non-moderated content), OR
206 - The linked content (HostRequest/GroupChat) is visible per where_moderated_content_visible
208 TODO: if you use this with a non-null column, check what's going on
209 """
210 hr_visible = exists(
211 where_moderated_content_visible(
212 select(HostRequest).where(HostRequest.moderation_state_id == column), context, HostRequest
213 )
214 )
215 gc_visible = exists(
216 where_moderated_content_visible(
217 select(GroupChat).where(GroupChat.moderation_state_id == column), context, GroupChat
218 )
219 )
220 fr_visible = exists(
221 where_moderated_content_visible(
222 select(FriendRelationship).where(FriendRelationship.moderation_state_id == column),
223 context,
224 FriendRelationship,
225 )
226 )
227 eo_visible = exists(
228 where_moderated_content_visible(
229 select(EventOccurrence).where(EventOccurrence.moderation_state_id == column),
230 context,
231 EventOccurrence,
232 )
233 )
234 return or_(column.is_(None), hr_visible, gc_visible, fr_visible, eo_visible)
237def _relevant_user_blocks(user_id: int) -> Select[tuple[int]]:
238 """
239 Gets a list of blocked user IDs or users that have blocked this user: those should be hidden
240 """
241 blocked_users = select(UserBlock.blocked_user_id).where(UserBlock.blocking_user_id == user_id)
242 blocking_users = select(UserBlock.blocking_user_id).where(UserBlock.blocked_user_id == user_id)
244 return select(union(blocked_users, blocking_users).subquery())
247def to_bool(value: bool) -> ColumnElement[bool]:
248 return true() if value else false()